alignment changes
* update influx plugin use the rest interface * align influx.py to ejabberdrpc changes * replace format with f-strings * minor performance improvements
This commit is contained in:
parent
ee25eb6e23
commit
7200131614
|
@ -10,7 +10,7 @@
|
||||||
"--comment": "influxdb",
|
"--comment": "influxdb",
|
||||||
"influxdb_host": "localhost",
|
"influxdb_host": "localhost",
|
||||||
"influxdb_port": 8086,
|
"influxdb_port": 8086,
|
||||||
"database": "ejabberd",
|
"influxdb_db": "ejabberd",
|
||||||
|
|
||||||
"--comment": "prometheus",
|
"--comment": "prometheus",
|
||||||
"prometheus_port": 8080,
|
"prometheus_port": 8080,
|
||||||
|
|
63
influx.py
63
influx.py
|
@ -2,8 +2,8 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import time
|
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
from influxdb import InfluxDBClient
|
from influxdb import InfluxDBClient
|
||||||
|
|
||||||
|
@ -11,19 +11,19 @@ from ejabberdrpc import EjabberdMetrics
|
||||||
|
|
||||||
|
|
||||||
class Influx:
|
class Influx:
|
||||||
def __init__(self, metrics, client):
|
def __init__(self, data, cld):
|
||||||
self._metrics = metrics
|
self._metrics = data
|
||||||
self.client = client
|
self.client = cld
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _timestamp():
|
def _timestamp():
|
||||||
return int(time.time() * 1000)
|
return int(time.time() * 1000)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _rmspace(key: str = None, value=None):
|
def _rmspace(key: str = None, value: (str, int) = None):
|
||||||
try:
|
try:
|
||||||
key = key.replace(" ", "\ ")
|
key = key.replace(' ', '\ ')
|
||||||
value = value.replace(" ","\ ")
|
value = value.replace(' ', '\ ')
|
||||||
except (TypeError, AttributeError):
|
except (TypeError, AttributeError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -37,73 +37,74 @@ class Influx:
|
||||||
|
|
||||||
# create tag_key=tag_value pairs for all elements and append them to name
|
# create tag_key=tag_value pairs for all elements and append them to name
|
||||||
for k, v in tags.items():
|
for k, v in tags.items():
|
||||||
output += ",{}={}".format(*self._rmspace(k, v))
|
output += ',{}={}'.format(*self._rmspace(k, v))
|
||||||
|
|
||||||
# append key=value to name
|
# append key=value to name
|
||||||
output += ' {}={}i {}'.format(*self._rmspace(key, value), ts)
|
output += ' {}={}i {}'.format(*self._rmspace(key, value), ts)
|
||||||
return output
|
return output
|
||||||
|
|
||||||
def writeMetrics(self):
|
def write_metrics(self):
|
||||||
name = "ejabberd"
|
|
||||||
data = list()
|
data = list()
|
||||||
|
|
||||||
# global values
|
# global values
|
||||||
cur_ts = self._timestamp()
|
cur_ts = self._timestamp()
|
||||||
data.append("{m} s2s_in={v}i {ts}".format(m=name, v=self._metrics.get_s2s_in(), ts=cur_ts))
|
data.append(f'ejabberd s2s_in={self._metrics.get_s2s_in()}i, {cur_ts}')
|
||||||
data.append("{m} s2s_out={v}i {ts}".format(m=name, v=self._metrics.get_s2s_out(), ts=cur_ts))
|
data.append(f'ejabberd s2s_out={self._metrics.get_s2s_out()}i, {cur_ts}')
|
||||||
|
|
||||||
# vhost values
|
# vhost values
|
||||||
for vhost in self._metrics.get_vhosts():
|
for vhost in self._metrics.get_vhosts():
|
||||||
cur_ts = self._timestamp()
|
cur_ts = self._timestamp()
|
||||||
data.append("{m},vhost={vh} registered={v}i {ts}".format(m=name, vh= vhost, v=self._metrics.get_registered(vhost),ts=cur_ts))
|
data.append(f'ejabberd,vhost={vhost} registered={self._metrics.get_registered(vhost)}i {cur_ts}')
|
||||||
data.append("{m},vhost={vh} muc={v}i {ts}".format(m=name, vh= vhost, v=self._metrics.get_muc(vhost), ts=cur_ts))
|
data.append(f'ejabberd,vhost={vhost} muc={self._metrics.get_muc(vhost)}i {cur_ts}')
|
||||||
|
|
||||||
# vhost statistics on their respective node
|
# vhost statistics on their respective node
|
||||||
for node in self._metrics.get_nodes():
|
for node in self._metrics.get_nodes():
|
||||||
cur_ts = self._timestamp()
|
cur_ts = self._timestamp()
|
||||||
for k, v in self._metrics.get_online_by_status(node=node, vhost=vhost).items():
|
for k, v in self._metrics.get_online_by_status(node=node, vhost=vhost).items():
|
||||||
data.append(self._parse("ejabberd_online_status", k, v, cur_ts, {"node": node, "vhost": vhost}))
|
data.append(self._parse('ejabberd_online_status', k, v, cur_ts, {'node': node, 'vhost': vhost}))
|
||||||
for k, v in self._metrics.get_online_by_client(node=node, vhost=vhost).items():
|
for k, v in self._metrics.get_online_by_client(node=node, vhost=vhost).items():
|
||||||
data.append(self._parse("ejabberd_online_client", k, v, cur_ts, {"node": node, "vhost": vhost}))
|
data.append(self._parse('ejabberd_online_client', k, v, cur_ts, {'node': node, 'vhost': vhost}))
|
||||||
for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=vhost).items():
|
for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=vhost).items():
|
||||||
data.append(self._parse("ejabberd_online_ipversion", k, v, cur_ts, {"node": node, "vhost": vhost}))
|
data.append(self._parse('ejabberd_online_ipversion', k, v, cur_ts, {'node': node, 'vhost': vhost}))
|
||||||
for k, v in self._metrics.get_online_by_connection(node=node, vhost=vhost).items():
|
for k, v in self._metrics.get_online_by_connection(node=node, vhost=vhost).items():
|
||||||
data.append(self._parse("ejabberd_online_connection", k, v, cur_ts, {"node": node, "vhost": vhost}))
|
data.append(self._parse('ejabberd_online_connection', k, v, cur_ts, {'node': node, 'vhost': vhost}))
|
||||||
for cl, ipv in self._metrics.get_online_client_by_ipversion(node=node, vhost=vhost).items():
|
for cl, ipv in self._metrics.get_online_client_by_ipversion(node=node, vhost=vhost).items():
|
||||||
for k, v in ipv.items():
|
for k, v in ipv.items():
|
||||||
data.append(self._parse("ejabberd_online_client_ipversion", k, v, cur_ts, {"vhost": vhost, "node": node, "ipversion": k, "client": cl}))
|
data.append(self._parse('ejabberd_online_client_ipversion', k, v, cur_ts,
|
||||||
|
{'vhost': vhost, 'node': node, 'ipversion': k, 'client': cl}))
|
||||||
|
|
||||||
# write output to database
|
# write output to database
|
||||||
self.client.write_points(data, time_precision='ms', batch_size=10000, protocol='line')
|
self.client.write_points(data, time_precision='ms', batch_size=10000, protocol='line')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == '__main__':
|
||||||
# load config
|
# load config
|
||||||
path = os.path.dirname(__file__)
|
path = os.path.dirname(__file__)
|
||||||
with open("/".join([path, "config.json"]), "r", encoding="utf-8") as f:
|
with open('/'.join([path, 'config.json']), 'r', encoding='utf-8') as f:
|
||||||
config = json.load(f)
|
config = json.load(f)
|
||||||
|
|
||||||
url = config['url'] if "url" in config else "http://localhost:4560"
|
# creds and params
|
||||||
login = config['login'] if "login" in config else None
|
url = config['url'] if 'url' in config else 'http://localhost:5280/api'
|
||||||
api = config['api'] if "api" in config else "rpc"
|
login = config['login'] if 'login' in config else None
|
||||||
|
api = config['api'] if 'api' in config else 'rest'
|
||||||
|
|
||||||
# config influxdb
|
# config influxdb
|
||||||
influxdb_host = config['influxdb_host'] if "influxdb_host" in config else "localhost"
|
influx_host = config['influxdb_host'] if 'influxdb_host' in config else 'localhost'
|
||||||
influxdb_port = config['influxdb_port'] if "influxdb_port" in config else 8086
|
influx_port = config['influxdb_port'] if 'influxdb_port' in config else 8086
|
||||||
influxdb_database = config['database'] if "database" in config else "ejabberd"
|
influx_dbname = config['influxdb_db'] if 'influxdb_db' in config else 'ejabberd'
|
||||||
|
|
||||||
# init global handler
|
# init handler
|
||||||
metrics = EjabberdMetrics(url, login, api)
|
metrics = EjabberdMetrics(url, login, api)
|
||||||
client = InfluxDBClient(host=influxdb_host, port=influxdb_port, database=influxdb_database, retries=5)
|
client = InfluxDBClient(host=influx_host, port=influx_port, database=influx_dbname, retries=5)
|
||||||
|
|
||||||
# create database only once
|
# create database only once
|
||||||
client.create_database(influxdb_database)
|
client.create_database(influx_dbname)
|
||||||
|
|
||||||
# init influx class
|
# init influx class
|
||||||
influx = Influx(metrics, client)
|
influx = Influx(metrics, client)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
metrics.update()
|
metrics.update()
|
||||||
influx.writeMetrics()
|
influx.write_metrics()
|
||||||
|
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
|
@ -1 +1,2 @@
|
||||||
influxdb>=5.2.0
|
influxdb>=5.2.0
|
||||||
|
requests>=2.21.0
|
||||||
|
|
Loading…
Reference in New Issue