From 6c58e9b352fcbdd4084694a915028c5e378aa059 Mon Sep 17 00:00:00 2001 From: nico Date: Sat, 19 Oct 2019 05:26:34 +0200 Subject: [PATCH 1/3] WIP --- .gitignore | 6 +- .../linux-systemd/ejabberd-influxdb.service | 14 +++ ejabberdrpc.py | 6 +- influx.py | 91 +++++++++++++++++++ 4 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 contrib/init/linux-systemd/ejabberd-influxdb.service create mode 100644 influx.py diff --git a/.gitignore b/.gitignore index 31a4ad7..263cfcc 100644 --- a/.gitignore +++ b/.gitignore @@ -108,4 +108,8 @@ dmypy.json .pyre/ # Pycharm -.idea/ \ No newline at end of file +.idea/ +.venv/ + +# config +config.json diff --git a/contrib/init/linux-systemd/ejabberd-influxdb.service b/contrib/init/linux-systemd/ejabberd-influxdb.service new file mode 100644 index 0000000..bade80f --- /dev/null +++ b/contrib/init/linux-systemd/ejabberd-influxdb.service @@ -0,0 +1,14 @@ +[Unit] +Description=ejabberd2influxdb + +[Service] +Type=simple +User=nobody +Group=nobody +ExecStart=/opt/ejabberd-metrics/influx.py +Restart=always +RestartSec=5s +Environment=PATH=/usr/bin:/usr/local/bin + +[Install] +WantedBy=multi-user.target diff --git a/ejabberdrpc.py b/ejabberdrpc.py index b2cc908..cee769b 100755 --- a/ejabberdrpc.py +++ b/ejabberdrpc.py @@ -109,7 +109,11 @@ class EjabberdMetrics(): def fetch_muc(self, vhost=None): host = "global" if vhost is not None: - host = "conference." + vhost + version = self._cmd("status", {}) + if "19.09" in version: + host = "conference." + vhost + else: + host = vhost result = self._cmd("muc_online_rooms", {"host": host}) if "rooms" in result: return len(result["rooms"]) diff --git a/influx.py b/influx.py new file mode 100644 index 0000000..34ee600 --- /dev/null +++ b/influx.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import time +import json + +from influxdb import InfluxDBClient + +from ejabberdrpc import EjabberdMetrics + + +class Influx(): + def __init__(self, metrics): + self._metrics = metrics + + @staticmethod + def _timestamp(): + return int(time.time() * 1000) + + def _parse(self, name, key, value, ts, tags=None): + output = name + + # check if tags is a dict + if isinstance(tags, dict): + + # create tag_key=tag_value pairs for all elements and append them to name + for k, v in tags.items(): + output += ",{}={}".format(k, v) + + # append key=value to name + + if " " in str(key): + key = key.replace(" ", "-") + + output += ' {}={}i {}'.format(key, value, ts) + return output + + def writeMetrics(self): + name = "ejabberd" + data = list() + + # global values + cur_ts = self._timestamp() + data.append("{m} registered={v}i {ts}".format(m=name, v=self._metrics.get_registered(), ts=cur_ts)) + data.append("{m} muc={v}i {ts}".format(m=name, v=self._metrics.get_muc(), ts=cur_ts)) + data.append("{m} s2s_in={v}i {ts}".format(m=name, v=self._metrics.get_s2s_in(), ts=cur_ts)) + data.append("{m} s2s_out={v}i {ts}".format(m=name, v=self._metrics.get_s2s_out(), ts=cur_ts)) + + # node values + for node in self._metrics.get_nodes(): + cur_ts = self._timestamp() + for k, v in self._metrics.get_online_by_status(node=node).items(): + data.append(self._parse("ejabberd_online_status", k, v, cur_ts, {"node": node})) + for k, v in self._metrics.get_online_by_client(node=node).items(): + data.append(self._parse("ejabberd_online_client", k, v, cur_ts, {"node": node})) + for k, v in self._metrics.get_online_by_ipversion(node=node).items(): + data.append(self._parse("ejabberd_online_ipversion", k, v, cur_ts, {"node": node})) + for k, v in self._metrics.get_online_by_connection(node=node).items(): + data.append(self._parse("ejabberd_online_connection", k, v, cur_ts, {"node": node})) + for k, v in self._metrics.get_online_by_vhost(node=node).items(): + data.append(self._parse("ejabberd_online_node", k, v, cur_ts, {"node": node})) + + # vhost values + # todo + + # output + client = InfluxDBClient(host='localhost', port=8086) + + # test code + client.create_database('writetest') + client_write_start_time = time.perf_counter() + + client.write_points(data, database='writetest', time_precision='ms', batch_size=10000, protocol='line') + + # test code + client_write_end_time = time.perf_counter() + print("Client Library Write: {time}s".format(time=client_write_end_time - client_write_start_time)) + + +if __name__ == "__main__": + with open("config.json", "r", encoding="utf-8") as f: + login = json.load(f) + + metrics = EjabberdMetrics("http://[::1]:4560", login) + influx = Influx(metrics) + + while True: + metrics.update() + influx.writeMetrics() + + time.sleep(10) \ No newline at end of file From 38dba3845f8d5b6445ff284df3b1f877a9cf79b1 Mon Sep 17 00:00:00 2001 From: nico Date: Tue, 22 Oct 2019 14:25:52 +0200 Subject: [PATCH 2/3] final touches + add method to escape whitespaces in keys and values * reverted node first mentality - vhosts are listed first and after that the respective nodes are listed --- influx.py | 61 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/influx.py b/influx.py index 34ee600..327154f 100644 --- a/influx.py +++ b/influx.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import time import json +import time from influxdb import InfluxDBClient @@ -17,6 +17,16 @@ class Influx(): def _timestamp(): return int(time.time() * 1000) + @staticmethod + def _rmspace(key: str = None, value=None): + try: + key = key.replace(" ", "\ ") + value = value.replace(" ","\ ") + except (TypeError, AttributeError): + pass + + return key, value + def _parse(self, name, key, value, ts, tags=None): output = name @@ -25,14 +35,10 @@ class Influx(): # create tag_key=tag_value pairs for all elements and append them to name for k, v in tags.items(): - output += ",{}={}".format(k, v) + output += ",{}={}".format(*self._rmspace(k, v)) # append key=value to name - - if " " in str(key): - key = key.replace(" ", "-") - - output += ' {}={}i {}'.format(key, value, ts) + output += ' {}={}i {}'.format(*self._rmspace(key, value), ts) return output def writeMetrics(self): @@ -41,35 +47,36 @@ class Influx(): # global values cur_ts = self._timestamp() - data.append("{m} registered={v}i {ts}".format(m=name, v=self._metrics.get_registered(), ts=cur_ts)) - data.append("{m} muc={v}i {ts}".format(m=name, v=self._metrics.get_muc(), ts=cur_ts)) data.append("{m} s2s_in={v}i {ts}".format(m=name, v=self._metrics.get_s2s_in(), ts=cur_ts)) data.append("{m} s2s_out={v}i {ts}".format(m=name, v=self._metrics.get_s2s_out(), ts=cur_ts)) - # node values - for node in self._metrics.get_nodes(): - cur_ts = self._timestamp() - for k, v in self._metrics.get_online_by_status(node=node).items(): - data.append(self._parse("ejabberd_online_status", k, v, cur_ts, {"node": node})) - for k, v in self._metrics.get_online_by_client(node=node).items(): - data.append(self._parse("ejabberd_online_client", k, v, cur_ts, {"node": node})) - for k, v in self._metrics.get_online_by_ipversion(node=node).items(): - data.append(self._parse("ejabberd_online_ipversion", k, v, cur_ts, {"node": node})) - for k, v in self._metrics.get_online_by_connection(node=node).items(): - data.append(self._parse("ejabberd_online_connection", k, v, cur_ts, {"node": node})) - for k, v in self._metrics.get_online_by_vhost(node=node).items(): - data.append(self._parse("ejabberd_online_node", k, v, cur_ts, {"node": node})) - # vhost values - # todo + for vhost in self._metrics.get_vhosts(): + cur_ts = self._timestamp() + data.append("{m},vhost={vh} registered={v}i {ts}".format(m=name, vh= vhost, v=self._metrics.get_registered(vhost),ts=cur_ts)) + data.append("{m},vhost={vh} muc={v}i {ts}".format(m=name, vh= vhost, v=self._metrics.get_muc(vhost), ts=cur_ts)) + + # vhost statistics on their repsective node + for node in self._metrics.get_nodes(): + cur_ts = self._timestamp() + for k, v in self._metrics.get_online_by_status(node=node, vhost=vhost).items(): + data.append(self._parse("ejabberd_online_status", k, v, cur_ts, {"node": node, "vhost": vhost})) + for k, v in self._metrics.get_online_by_client(node=node, vhost=vhost).items(): + data.append(self._parse("ejabberd_online_client", k, v, cur_ts, {"node": node, "vhost": vhost})) + for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=vhost).items(): + data.append(self._parse("ejabberd_online_ipversion", k, v, cur_ts, {"node": node, "vhost": vhost})) + for k, v in self._metrics.get_online_by_connection(node=node, vhost=vhost).items(): + data.append(self._parse("ejabberd_online_connection", k, v, cur_ts, {"node": node, "vhost": vhost})) + for cl, ipv in self._metrics.get_online_client_by_ipversion(node=node, vhost=vhost).items(): + for k, v in ipv.items(): + data.append(self._parse("ejabberd_online_client_ipversion", k, v, cur_ts, {"vhost": vhost, "node": node, "ipversion": k, "client": cl})) # output client = InfluxDBClient(host='localhost', port=8086) - + # test code client.create_database('writetest') client_write_start_time = time.perf_counter() - client.write_points(data, database='writetest', time_precision='ms', batch_size=10000, protocol='line') # test code @@ -88,4 +95,4 @@ if __name__ == "__main__": metrics.update() influx.writeMetrics() - time.sleep(10) \ No newline at end of file + time.sleep(10) From d0fb756e448ec697353b0213a41348e89d387ccb Mon Sep 17 00:00:00 2001 From: nico Date: Thu, 24 Oct 2019 17:41:51 +0200 Subject: [PATCH 3/3] optimization attempt * init client handler only once and not every time * don't try to create database on every run just on the first one --- influx.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/influx.py b/influx.py index 327154f..fcf2f41 100644 --- a/influx.py +++ b/influx.py @@ -9,9 +9,10 @@ from influxdb import InfluxDBClient from ejabberdrpc import EjabberdMetrics -class Influx(): - def __init__(self, metrics): +class Influx: + def __init__(self, metrics, client): self._metrics = metrics + self.client = client @staticmethod def _timestamp(): @@ -71,27 +72,25 @@ class Influx(): for k, v in ipv.items(): data.append(self._parse("ejabberd_online_client_ipversion", k, v, cur_ts, {"vhost": vhost, "node": node, "ipversion": k, "client": cl})) - # output - client = InfluxDBClient(host='localhost', port=8086) - - # test code - client.create_database('writetest') - client_write_start_time = time.perf_counter() - client.write_points(data, database='writetest', time_precision='ms', batch_size=10000, protocol='line') - - # test code - client_write_end_time = time.perf_counter() - print("Client Library Write: {time}s".format(time=client_write_end_time - client_write_start_time)) - + # write output to database + self.client.write_points(data, database='custom', time_precision='ms', batch_size=10000, protocol='line') if __name__ == "__main__": with open("config.json", "r", encoding="utf-8") as f: login = json.load(f) - metrics = EjabberdMetrics("http://[::1]:4560", login) - influx = Influx(metrics) + # init global handler + metrics = EjabberdMetrics("http://localhost:4560", login) + client = InfluxDBClient(host='localhost', port=8086, database='custom') + + # create database only once + client.create_database('custom') + + # init influx class + influx = Influx(metrics, client) while True: + # TODO this will fail when the connection drops try except maybe? metrics.update() influx.writeMetrics()