From fb529130e90fd5e9cfcadf568e4492a885939abd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Malte=20Bl=C3=A4ttermann?= Date: Wed, 19 Apr 2017 22:40:49 +0200 Subject: [PATCH] First work on elasticsearch implemenation of Connection interface --- database/all/main.go | 1 + database/elasticsearch/elasticsearch.go | 115 ++++++++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 database/elasticsearch/elasticsearch.go diff --git a/database/all/main.go b/database/all/main.go index fae0160..03ec3d5 100644 --- a/database/all/main.go +++ b/database/all/main.go @@ -4,4 +4,5 @@ import ( _ "github.com/FreifunkBremen/yanic/database/graphite" _ "github.com/FreifunkBremen/yanic/database/influxdb" _ "github.com/FreifunkBremen/yanic/database/logging" + _ "github.com/FreifunkBremen/yanic/database/elasticsearch" ) diff --git a/database/elasticsearch/elasticsearch.go b/database/elasticsearch/elasticsearch.go new file mode 100644 index 0000000..55bb828 --- /dev/null +++ b/database/elasticsearch/elasticsearch.go @@ -0,0 +1,115 @@ +package logging + +/** + * This database type is just for, + * - debugging without a influxconn + * - example for other developers for new databases + */ +import ( + + "log" + "time" + + "github.com/FreifunkBremen/yanic/database" + "github.com/FreifunkBremen/yanic/runtime" + "gopkg.in/olivere/elastic.v5" + + + "context" + +) + +type Connection struct { + database.Connection + config Config + client *elastic.Client +} + +type Config map[string]interface{} + +func (c Config) Enable() bool { + return c["enable"].(bool) +} +func (c Config) Path() string { + return c["path"].(string) +} + +func init() { + database.RegisterAdapter("elasticsearch", Connect) +} + +func Connect(configuration interface{}) (database.Connection, error) { + var config Config + config = configuration.(map[string]interface{}) + if !config.Enable() { + return nil, nil + } + + + // Create a client + client, err := elastic.NewClient( + elastic.SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"), + elastic.SetBasicAuth("user", "secret")) + + + + if err != nil { + // Handle error + panic(err) + } + + + return &Connection{config: config, client: client}, nil +} + +func (conn *Connection) InsertNode(node *runtime.Node) { + log("InsertNode: [", node.Statistics.NodeID, "] clients: ", node.Statistics.Clients.Total) + + _, err = conn.client.Index() + Index("ffhb"). + Type("node"). + BodyJson(node). + Refresh("true"). + Do(context.Background()) + + + + if err != nil { + // Handle error + panic(err) + } + + + +} + +func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) { + log.Print("InsertGlobals: [", time.String(), "] nodes: ", stats.Nodes, ", clients: ", stats.Clients, " models: ", len(stats.Models)) + + _, err = conn.client.Index() + Index("ffhb"). + Type("globals"). + BodyJson(stats). + Refresh("true"). + Do(context.Background()) + + + if err != nil { + // Handle error + panic(err) + } + + +} + +func (conn *Connection) PruneNodes(deleteAfter time.Duration) { + log.Print("PruneNodes") + + // TODO +} + +func (conn *Connection) Close() { + + log.Print("Closing connection, stop client") + conn.client.Stop() +}