refactory prometheus exporter - use python lib

This commit is contained in:
genofire 2020-06-09 20:17:00 +02:00
parent 692d1df2c1
commit 3bcc8ab25a
3 changed files with 79 additions and 62 deletions

View File

@ -22,5 +22,6 @@ influxdb_port: 8086
influxdb_db: "example"
# prometheus configuration
prometheus_address: "127.0.0.1"
prometheus_port: 8080
prometheus_refresh: 10
prometheus_cache_ttl: 10

135
prometheus.py Normal file → Executable file
View File

@ -1,93 +1,110 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import threading
import time
import subprocess
import logging
from time import time
from collections import defaultdict
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from urllib.parse import parse_qs, urlparse
from prometheus_client import (
CollectorRegistry, Gauge, generate_latest, CONTENT_TYPE_LATEST
)
from config import Config
from metrics import EjabberdMetrics
class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
"""Thread per request HTTP server."""
# Make worker threads "fire and forget". Beginning with Python 3.7 this
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all
# non-daemon threads in a list in order to join on them at server close.
# Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the
# same as Python 3.7's ``ThreadingHTTPServer``.
daemon_threads = True
address_family = socket.AF_INET6
class DynamicMetricsHandler(BaseHTTPRequestHandler):
"""HTTP handler that gives metrics from ``core.REGISTRY``."""
def do_GET(self):
params = parse_qs(urlparse(self.path).query)
registry = self.generator(params)
if 'name[]' in params:
registry = registry.restricted_registry(params['name[]'])
try:
output = generate_latest(registry)
except:
self.send_error(500, 'error generating metric output')
raise
self.send_response(200)
self.send_header('Content-Type', CONTENT_TYPE_LATEST)
self.end_headers()
self.wfile.write(output)
@staticmethod
def factory(registry_generator):
DynMetricsHandler = type('MetricsHandler',
(DynamicMetricsHandler, object),
{"generator": registry_generator})
return DynMetricsHandler
class Prometheus():
def __init__(self, metrics):
self.ttl = 10
self._last_update = 0
self._metrics = metrics
def _parse_metric(self, name, value, tags=None):
output = name
if isinstance(tags, dict):
output += "{"
first = True
for k, v in tags.items():
if not first:
output += ', '
else:
first = False
output += k+'="'+v+'"'
output += '}'
return output + ' {}\n'.format(value)
def handler(self, metrics_handler):
now = time()
if now >= i(self._last_update + self.ttl):
self._metrics.update()
self._last_update = now
def _get_metrics(self):
output = ""
registry = CollectorRegistry(auto_describe=True)
Gauge('ejabberd_node_s2s_in', 'count of incoming server-to-server connection', registry=registry).set(self._metrics.get_s2s_in())
Gauge('ejabberd_node_s2s_out', 'count of outgoing server-to-server connection', registry=registry).set(self._metrics.get_s2s_out())
labelnames_vhost = ["vhost"]
registered_vhosts = Gauge('ejabberd_registered_vhosts', 'count of user per vhost', labelnames_vhost, registry=registry)
muc = Gauge('ejabberd_muc', 'count of muc\'s per vhost', labelnames_vhost, registry=registry)
online_vhost_node = Gauge('ejabberd_online_vhost_node', 'count of client connections', ["vhost","node"], registry=registry)
online_status = Gauge('ejabberd_online_status', 'count of client connections', ["vhost","node","status"], registry=registry)
online_connection = Gauge('ejabberd_online_connection', 'count of client connections', ["vhost","node","connection"], registry=registry)
online_client = Gauge('ejabberd_online_client', 'count of client software', ["vhost","node","client"], registry=registry)
online_ipversion = Gauge('ejabberd_online_ipversion', 'count of client software', ["vhost","node","ipversion"], registry=registry)
online_client_ipversion = Gauge('ejabberd_online_client_ipversion', 'count of client software', ["vhost","node","client","ipversion"], registry=registry)
output += self._parse_metric("ejabberd_node_s2s_in", self._metrics.get_s2s_in())
output += self._parse_metric("ejabberd_node_s2s_out", self._metrics.get_s2s_out())
for host in self._metrics.get_vhosts():
output += self._parse_metric("ejabberd_registered_vhosts", self._metrics.get_registered(host), {"vhost": host})
muc = self._metrics.get_muc(host)
if muc is not None:
output += self._parse_metric("ejabberd_muc", muc, {"vhost": host})
labels_vhost = (host)
registered_vhosts.labels(labels_vhost).set(self._metrics.get_registered(host))
muc.labels(labels_vhost).set(self._metrics.get_muc(host))
for k, v in self._metrics.get_online_by_node(vhost=host).items():
output += self._parse_metric("ejabberd_online_vhost_node", v, {"vhost": host, "node": k})
online_vhost_node.labels(host,k).set(v)
for node in self._metrics.get_nodes():
for k, v in self._metrics.get_online_by_status(node=node, vhost=host).items():
output += self._parse_metric("ejabberd_online_status", v, {"vhost": host, "node": node, "status": k})
online_status.labels(host,node,k).set(v)
for k, v in self._metrics.get_online_by_connection(node=node, vhost=host).items():
output += self._parse_metric("ejabberd_online_connection", v, {"vhost": host, "node": node, "connection": k})
online_connection.labels(host,node,k).set(v)
for k, v in self._metrics.get_online_by_client(node=node, vhost=host).items():
output += self._parse_metric("ejabberd_online_client", v, {"vhost": host, "node": node, "client": k})
online_client.labels(host,node,k).set(v)
for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=host).items():
output += self._parse_metric("ejabberd_online_ipversion", v, {"vhost": host, "node": node, "ipversion": str(k)})
online_ipversion.labels(host,node,k).set(v)
for client, data in self._metrics.get_online_client_by_ipversion(node=node,vhost=host).items():
for k, v in data.items():
output += self._parse_metric("ejabberd_online_client_ipversion", v, {"vhost": host, "node": node, "ipversion": str(k), "client": client})
online_client_ipversion.labels(host,node,str(k),client).set(v)
return output
return registry
def listen(self, port, addr='::'):
"""Starts an HTTP server for prometheus metrics as a daemon thread"""
class myHandler(BaseHTTPRequestHandler):
def do_GET(r):
r.send_response(200)
r.send_header('Content-type', 'text/html')
r.end_headers()
result = self._get_metrics()
r.wfile.write(result.encode('utf-8'))
httpd = _ThreadingSimpleServer((addr, port), myHandler)
t = threading.Thread(target=httpd.serve_forever)
t.daemon = True
t.start()
def listen(self, addr=("127.0.0.1", 8080)):
server = HTTPServer(addr, DynamicMetricsHandler.factory(self.handler))
server.serve_forever()
if __name__ == "__main__":
# load config
config = Config()
if config.get('debug', default=False):
logging.getLogger().setLevel(logging.DEBUG)
# credentials and parameters
url = config.get('url', default='http://[::1]:5280/api')
@ -95,12 +112,10 @@ if __name__ == "__main__":
api = config.get('api', default='rest')
# config prometheus
prom_addr = config.get('prometheus_address', default='127.0.0.1')
prom_port = config.get('prometheus_port', default=8080)
prom_refresh = config.get('prometheus_refresh', default=10)
metrics = EjabberdMetrics(url, login, api)
prom = Prometheus(metrics)
prom.listen(prom_port)
while True:
metrics.update()
time.sleep(prom_refresh)
prom.ttl = config.get('prometheus_cache_ttl', default=10)
prom.listen((prom_addr, prom_port))

View File

@ -1,3 +1,4 @@
prometheus_client>=0.8.0
influxdb>=5.2.0
requests>=2.21.0
packaging>=20.1