98 lines
3.8 KiB
Python
98 lines
3.8 KiB
Python
|
#!/usr/bin/env python3
|
||
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
import json
|
||
|
import time
|
||
|
|
||
|
from influxdb import InfluxDBClient
|
||
|
|
||
|
from ejabberdrpc import EjabberdMetrics
|
||
|
|
||
|
|
||
|
class Influx:
|
||
|
def __init__(self, metrics, client):
|
||
|
self._metrics = metrics
|
||
|
self.client = client
|
||
|
|
||
|
@staticmethod
|
||
|
def _timestamp():
|
||
|
return int(time.time() * 1000)
|
||
|
|
||
|
@staticmethod
|
||
|
def _rmspace(key: str = None, value=None):
|
||
|
try:
|
||
|
key = key.replace(" ", "\ ")
|
||
|
value = value.replace(" ","\ ")
|
||
|
except (TypeError, AttributeError):
|
||
|
pass
|
||
|
|
||
|
return key, value
|
||
|
|
||
|
def _parse(self, name, key, value, ts, tags=None):
|
||
|
output = name
|
||
|
|
||
|
# check if tags is a dict
|
||
|
if isinstance(tags, dict):
|
||
|
|
||
|
# create tag_key=tag_value pairs for all elements and append them to name
|
||
|
for k, v in tags.items():
|
||
|
output += ",{}={}".format(*self._rmspace(k, v))
|
||
|
|
||
|
# append key=value to name
|
||
|
output += ' {}={}i {}'.format(*self._rmspace(key, value), ts)
|
||
|
return output
|
||
|
|
||
|
def writeMetrics(self):
|
||
|
name = "ejabberd"
|
||
|
data = list()
|
||
|
|
||
|
# global values
|
||
|
cur_ts = self._timestamp()
|
||
|
data.append("{m} s2s_in={v}i {ts}".format(m=name, v=self._metrics.get_s2s_in(), ts=cur_ts))
|
||
|
data.append("{m} s2s_out={v}i {ts}".format(m=name, v=self._metrics.get_s2s_out(), ts=cur_ts))
|
||
|
|
||
|
# vhost values
|
||
|
for vhost in self._metrics.get_vhosts():
|
||
|
cur_ts = self._timestamp()
|
||
|
data.append("{m},vhost={vh} registered={v}i {ts}".format(m=name, vh= vhost, v=self._metrics.get_registered(vhost),ts=cur_ts))
|
||
|
data.append("{m},vhost={vh} muc={v}i {ts}".format(m=name, vh= vhost, v=self._metrics.get_muc(vhost), ts=cur_ts))
|
||
|
|
||
|
# vhost statistics on their repsective node
|
||
|
for node in self._metrics.get_nodes():
|
||
|
cur_ts = self._timestamp()
|
||
|
for k, v in self._metrics.get_online_by_status(node=node, vhost=vhost).items():
|
||
|
data.append(self._parse("ejabberd_online_status", k, v, cur_ts, {"node": node, "vhost": vhost}))
|
||
|
for k, v in self._metrics.get_online_by_client(node=node, vhost=vhost).items():
|
||
|
data.append(self._parse("ejabberd_online_client", k, v, cur_ts, {"node": node, "vhost": vhost}))
|
||
|
for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=vhost).items():
|
||
|
data.append(self._parse("ejabberd_online_ipversion", k, v, cur_ts, {"node": node, "vhost": vhost}))
|
||
|
for k, v in self._metrics.get_online_by_connection(node=node, vhost=vhost).items():
|
||
|
data.append(self._parse("ejabberd_online_connection", k, v, cur_ts, {"node": node, "vhost": vhost}))
|
||
|
for cl, ipv in self._metrics.get_online_client_by_ipversion(node=node, vhost=vhost).items():
|
||
|
for k, v in ipv.items():
|
||
|
data.append(self._parse("ejabberd_online_client_ipversion", k, v, cur_ts, {"vhost": vhost, "node": node, "ipversion": k, "client": cl}))
|
||
|
|
||
|
# write output to database
|
||
|
self.client.write_points(data, database='custom', time_precision='ms', batch_size=10000, protocol='line')
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
with open("config.json", "r", encoding="utf-8") as f:
|
||
|
login = json.load(f)
|
||
|
|
||
|
# init global handler
|
||
|
metrics = EjabberdMetrics("http://localhost:4560", login)
|
||
|
client = InfluxDBClient(host='localhost', port=8086, database='custom')
|
||
|
|
||
|
# create database only once
|
||
|
client.create_database('custom')
|
||
|
|
||
|
# init influx class
|
||
|
influx = Influx(metrics, client)
|
||
|
|
||
|
while True:
|
||
|
# TODO this will fail when the connection drops try except maybe?
|
||
|
metrics.update()
|
||
|
influx.writeMetrics()
|
||
|
|
||
|
time.sleep(10)
|