diff --git a/ejabberd-metrics.yml.default b/ejabberd-metrics.yml.default index 41487e0..e89d0ef 100644 --- a/ejabberd-metrics.yml.default +++ b/ejabberd-metrics.yml.default @@ -22,5 +22,6 @@ influxdb_port: 8086 influxdb_db: "example" # prometheus configuration +prometheus_address: "127.0.0.1" prometheus_port: 8080 -prometheus_refresh: 10 +prometheus_cache_ttl: 10 diff --git a/prometheus.py b/prometheus.py old mode 100644 new mode 100755 index 4441c87..440971e --- a/prometheus.py +++ b/prometheus.py @@ -1,93 +1,110 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import socket -import threading -import time +import subprocess +import logging +from time import time +from collections import defaultdict from http.server import BaseHTTPRequestHandler, HTTPServer -from socketserver import ThreadingMixIn +from urllib.parse import parse_qs, urlparse +from prometheus_client import ( + CollectorRegistry, Gauge, generate_latest, CONTENT_TYPE_LATEST +) + + from config import Config from metrics import EjabberdMetrics -class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer): - """Thread per request HTTP server.""" - # Make worker threads "fire and forget". Beginning with Python 3.7 this - # prevents a memory leak because ``ThreadingMixIn`` starts to gather all - # non-daemon threads in a list in order to join on them at server close. - # Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the - # same as Python 3.7's ``ThreadingHTTPServer``. - daemon_threads = True - address_family = socket.AF_INET6 +class DynamicMetricsHandler(BaseHTTPRequestHandler): + """HTTP handler that gives metrics from ``core.REGISTRY``.""" + def do_GET(self): + params = parse_qs(urlparse(self.path).query) + registry = self.generator(params) + if 'name[]' in params: + registry = registry.restricted_registry(params['name[]']) + try: + output = generate_latest(registry) + except: + self.send_error(500, 'error generating metric output') + raise + self.send_response(200) + self.send_header('Content-Type', CONTENT_TYPE_LATEST) + self.end_headers() + self.wfile.write(output) + @staticmethod + def factory(registry_generator): + DynMetricsHandler = type('MetricsHandler', + (DynamicMetricsHandler, object), + {"generator": registry_generator}) + return DynMetricsHandler class Prometheus(): def __init__(self, metrics): + self.ttl = 10 + self._last_update = 0 self._metrics = metrics - def _parse_metric(self, name, value, tags=None): - output = name - if isinstance(tags, dict): - output += "{" - first = True - for k, v in tags.items(): - if not first: - output += ', ' - else: - first = False - output += k+'="'+v+'"' - output += '}' - return output + ' {}\n'.format(value) + def handler(self, metrics_handler): + now = time() + if now >= i(self._last_update + self.ttl): + self._metrics.update() + self._last_update = now - def _get_metrics(self): - output = "" + registry = CollectorRegistry(auto_describe=True) + + Gauge('ejabberd_node_s2s_in', 'count of incoming server-to-server connection', registry=registry).set(self._metrics.get_s2s_in()) + Gauge('ejabberd_node_s2s_out', 'count of outgoing server-to-server connection', registry=registry).set(self._metrics.get_s2s_out()) + + labelnames_vhost = ["vhost"] + + registered_vhosts = Gauge('ejabberd_registered_vhosts', 'count of user per vhost', labelnames_vhost, registry=registry) + muc = Gauge('ejabberd_muc', 'count of muc\'s per vhost', labelnames_vhost, registry=registry) + + online_vhost_node = Gauge('ejabberd_online_vhost_node', 'count of client connections', ["vhost","node"], registry=registry) + + online_status = Gauge('ejabberd_online_status', 'count of client connections', ["vhost","node","status"], registry=registry) + online_connection = Gauge('ejabberd_online_connection', 'count of client connections', ["vhost","node","connection"], registry=registry) + online_client = Gauge('ejabberd_online_client', 'count of client software', ["vhost","node","client"], registry=registry) + online_ipversion = Gauge('ejabberd_online_ipversion', 'count of client software', ["vhost","node","ipversion"], registry=registry) + online_client_ipversion = Gauge('ejabberd_online_client_ipversion', 'count of client software', ["vhost","node","client","ipversion"], registry=registry) - output += self._parse_metric("ejabberd_node_s2s_in", self._metrics.get_s2s_in()) - output += self._parse_metric("ejabberd_node_s2s_out", self._metrics.get_s2s_out()) for host in self._metrics.get_vhosts(): - output += self._parse_metric("ejabberd_registered_vhosts", self._metrics.get_registered(host), {"vhost": host}) - muc = self._metrics.get_muc(host) - if muc is not None: - output += self._parse_metric("ejabberd_muc", muc, {"vhost": host}) + labels_vhost = (host) + + registered_vhosts.labels(labels_vhost).set(self._metrics.get_registered(host)) + muc.labels(labels_vhost).set(self._metrics.get_muc(host)) for k, v in self._metrics.get_online_by_node(vhost=host).items(): - output += self._parse_metric("ejabberd_online_vhost_node", v, {"vhost": host, "node": k}) + online_vhost_node.labels(host,k).set(v) for node in self._metrics.get_nodes(): for k, v in self._metrics.get_online_by_status(node=node, vhost=host).items(): - output += self._parse_metric("ejabberd_online_status", v, {"vhost": host, "node": node, "status": k}) + online_status.labels(host,node,k).set(v) for k, v in self._metrics.get_online_by_connection(node=node, vhost=host).items(): - output += self._parse_metric("ejabberd_online_connection", v, {"vhost": host, "node": node, "connection": k}) + online_connection.labels(host,node,k).set(v) for k, v in self._metrics.get_online_by_client(node=node, vhost=host).items(): - output += self._parse_metric("ejabberd_online_client", v, {"vhost": host, "node": node, "client": k}) + online_client.labels(host,node,k).set(v) for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=host).items(): - output += self._parse_metric("ejabberd_online_ipversion", v, {"vhost": host, "node": node, "ipversion": str(k)}) + online_ipversion.labels(host,node,k).set(v) for client, data in self._metrics.get_online_client_by_ipversion(node=node,vhost=host).items(): for k, v in data.items(): - output += self._parse_metric("ejabberd_online_client_ipversion", v, {"vhost": host, "node": node, "ipversion": str(k), "client": client}) + online_client_ipversion.labels(host,node,str(k),client).set(v) - return output + return registry - def listen(self, port, addr='::'): - """Starts an HTTP server for prometheus metrics as a daemon thread""" - class myHandler(BaseHTTPRequestHandler): - def do_GET(r): - r.send_response(200) - r.send_header('Content-type', 'text/html') - r.end_headers() - result = self._get_metrics() - r.wfile.write(result.encode('utf-8')) - - httpd = _ThreadingSimpleServer((addr, port), myHandler) - t = threading.Thread(target=httpd.serve_forever) - t.daemon = True - t.start() + def listen(self, addr=("127.0.0.1", 8080)): + server = HTTPServer(addr, DynamicMetricsHandler.factory(self.handler)) + server.serve_forever() if __name__ == "__main__": # load config config = Config() + if config.get('debug', default=False): + logging.getLogger().setLevel(logging.DEBUG) # credentials and parameters url = config.get('url', default='http://[::1]:5280/api') @@ -95,12 +112,10 @@ if __name__ == "__main__": api = config.get('api', default='rest') # config prometheus + prom_addr = config.get('prometheus_address', default='127.0.0.1') prom_port = config.get('prometheus_port', default=8080) - prom_refresh = config.get('prometheus_refresh', default=10) metrics = EjabberdMetrics(url, login, api) prom = Prometheus(metrics) - prom.listen(prom_port) - while True: - metrics.update() - time.sleep(prom_refresh) + prom.ttl = config.get('prometheus_cache_ttl', default=10) + prom.listen((prom_addr, prom_port)) diff --git a/requirements.txt b/requirements.txt index 3ed92b9..0c5120d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ +prometheus_client>=0.8.0 influxdb>=5.2.0 requests>=2.21.0 packaging>=20.1 ruamel.yaml==0.16.10 -ruamel.yaml.clib==0.2.0 \ No newline at end of file +ruamel.yaml.clib==0.2.0