ejabberd-tools/influx.py

107 lines
4.0 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from influxdb import InfluxDBClient
from config import Config
from ejabberdrpc import EjabberdMetrics
class Influx:
def __init__(self, data, cld):
self._metrics = data
self.client = cld
@staticmethod
def _timestamp():
return int(time.time() * 1000)
@staticmethod
def _rmspace(key: str = None, value: (str, int) = None):
try:
key = key.replace(' ', '\ ')
value = value.replace(' ', '\ ')
except (TypeError, AttributeError):
pass
return key, value
def _parse(self, name, key, value, ts, tags=None):
output = name
# check if tags is a dict
if isinstance(tags, dict):
# create tag_key=tag_value pairs for all elements and append them to name
for k, v in tags.items():
output += ',{}={}'.format(*self._rmspace(k, v))
# append key=value to name
output += ' {}={}i {}'.format(*self._rmspace(key, value), ts)
return output
def write_metrics(self):
data = list()
# global values
cur_ts = self._timestamp()
data.append(f'ejabberd s2s_in={self._metrics.get_s2s_in()}i {cur_ts}')
data.append(f'ejabberd s2s_out={self._metrics.get_s2s_out()}i {cur_ts}')
# vhost values
for vhost in self._metrics.get_vhosts():
cur_ts = self._timestamp()
data.append(f'ejabberd,vhost={vhost} registered={self._metrics.get_registered(vhost)}i {cur_ts}')
data.append(f'ejabberd,vhost={vhost} muc={self._metrics.get_muc(vhost)}i {cur_ts}')
# vhost statistics on their respective node
for node in self._metrics.get_nodes():
cur_ts = self._timestamp()
for k, v in self._metrics.get_online_by_status(node=node, vhost=vhost).items():
data.append(self._parse('ejabberd_online_status', k, v, cur_ts, {'node': node, 'vhost': vhost}))
for k, v in self._metrics.get_online_by_client(node=node, vhost=vhost).items():
data.append(self._parse('ejabberd_online_client', k, v, cur_ts, {'node': node, 'vhost': vhost}))
for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=vhost).items():
data.append(self._parse('ejabberd_online_ipversion', k, v, cur_ts, {'node': node, 'vhost': vhost}))
for k, v in self._metrics.get_online_by_connection(node=node, vhost=vhost).items():
data.append(self._parse('ejabberd_online_connection', k, v, cur_ts, {'node': node, 'vhost': vhost}))
for cl, ipv in self._metrics.get_online_client_by_ipversion(node=node, vhost=vhost).items():
for k, v in ipv.items():
data.append(self._parse('ejabberd_online_client_ipversion', k, v, cur_ts,
{'vhost': vhost, 'node': node, 'ipversion': k, 'client': cl}))
# write output to database
self.client.write_points(data, time_precision='ms', batch_size=10000, protocol='line')
if __name__ == '__main__':
# load config
config = Config()
# credentials and parameters
url = config.get('url', default='http://localhost:5280/api')
login = config.get('login', default=None)
api = config.get('api', default='rest')
# config influxdb
influx_host = config.get('influxdb_host', default='localhost')
influx_port = config.get('influxdb_port', default=8086)
influx_dbname = config.get('influxdb_db', default='ejabberd')
# init handler
metrics = EjabberdMetrics(url, login, api)
client = InfluxDBClient(host=influx_host, port=influx_port, database=influx_dbname, retries=5)
# create database only once
client.create_database(influx_dbname)
# init influx class
influx = Influx(metrics, client)
while True:
metrics.update()
influx.write_metrics()
time.sleep(10)