commit 462f065245fdef7158a6ebf44a90515e04e7a4d3 Author: Martin/Geno Date: Sat Sep 29 00:38:54 2018 +0200 init diff --git a/api.go b/api.go new file mode 100644 index 0000000..744037b --- /dev/null +++ b/api.go @@ -0,0 +1,7 @@ +package main + +type R2PAPIAll struct { + CountIn int `json:"count_in"` + CountOut int `json:"count_out"` + CountCurrent int `json:"count_current"` +} diff --git a/helper.go b/helper.go new file mode 100644 index 0000000..7484546 --- /dev/null +++ b/helper.go @@ -0,0 +1,24 @@ +package main + +import ( + "encoding/json" + "net/http" + "time" +) + +func JSONRequest(url string, value interface{}) error { + var netClient = &http.Client{ + Timeout: time.Second * 20, + } + + resp, err := netClient.Get(url) + if err != nil { + return err + } + + err = json.NewDecoder(resp.Body).Decode(&value) + if err != nil { + return err + } + return nil +} diff --git a/influxdb.go b/influxdb.go new file mode 100644 index 0000000..edb0e55 --- /dev/null +++ b/influxdb.go @@ -0,0 +1,115 @@ +package main + +import ( + "sync" + "time" + + log "github.com/Sirupsen/logrus" + + "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/influxdb/models" +) + +const ( + batchMaxSize = 1000 + batchTimeout = 5 * time.Second +) + +type InfluxDBConnection struct { + dbname string + client client.Client + points chan *client.Point + wg sync.WaitGroup +} + +func InfluxDBConnect(config client.HTTPConfig, dbname string) (*InfluxDBConnection, error) { + + // Make client + c, err := client.NewHTTPClient(config) + + if err != nil { + return nil, err + } + + _, _, err = c.Ping(time.Millisecond * 50) + if err != nil { + return nil, err + } + + db := &InfluxDBConnection{ + dbname: dbname, + client: c, + points: make(chan *client.Point, batchMaxSize), + } + + db.wg.Add(1) + go db.addWorker() + + return db, nil +} + +func (conn *InfluxDBConnection) addPoint(name string, tags models.Tags, fields models.Fields) { + point, err := client.NewPoint(name, tags.Map(), fields, time.Now()) + if err != nil { + panic(err) + } + conn.points <- point +} + +// Close all connection and clean up +func (conn *InfluxDBConnection) Close() { + close(conn.points) + conn.wg.Wait() + conn.client.Close() +} + +// stores data points in batches into the influxdb +func (conn *InfluxDBConnection) addWorker() { + bpConfig := client.BatchPointsConfig{ + Database: conn.dbname, + Precision: "m", + } + + var bp client.BatchPoints + var err error + var writeNow, closed bool + timer := time.NewTimer(batchTimeout) + + for !closed { + // wait for new points + select { + case point, ok := <-conn.points: + if ok { + if bp == nil { + // create new batch + timer.Reset(batchTimeout) + if bp, err = client.NewBatchPoints(bpConfig); err != nil { + log.Fatal(err) + } + } + bp.AddPoint(point) + } else { + closed = true + } + case <-timer.C: + if bp == nil { + timer.Reset(batchTimeout) + } else { + writeNow = true + } + } + + // write batch now? + if bp != nil && (writeNow || closed || len(bp.Points()) >= batchMaxSize) { + log.Infof("saving %d points", len(bp.Points())) + + if err = conn.client.Write(bp); err != nil { + log.Errorf("influxdb write error: %s", err.Error()) + } + writeNow = false + bp = nil + } + } + timer.Stop() + conn.wg.Done() +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..41b09a2 --- /dev/null +++ b/main.go @@ -0,0 +1,112 @@ +package main + +import ( + "flag" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "dev.sum7.eu/genofire/golang-lib/worker" + log "github.com/Sirupsen/logrus" + + "github.com/influxdata/influxdb/client/v2" + models "github.com/influxdata/influxdb/models" + + "github.com/graarh/golang-socketio" + "github.com/graarh/golang-socketio/transport" +) + +func main() { + r2pURL := "" + influxAddr := "" + influxUser := "" + influxPassword := "" + influxDB := "" + silenceLog := false + verboseLog := false + + flag.BoolVar(&silenceLog, "s", false, "silence") + flag.BoolVar(&verboseLog, "v", false, "verbose") + + flag.StringVar(&r2pURL, "url", "http://192.168.1.110:8080", "weburl") + + flag.StringVar(&influxAddr, "influx-addr", "http://127.0.0.1/influxdb", "influxdb") + flag.StringVar(&influxUser, "influx-user", "r2puser", "") + flag.StringVar(&influxPassword, "influx-pw", "PASSWORD", "") + flag.StringVar(&influxDB, "influx-db", "r2p", "") + + flag.Parse() + + if silenceLog { + log.SetLevel(log.WarnLevel) + } else if verboseLog { + log.SetLevel(log.DebugLevel) + } + + api, err := gosocketio.Dial( + strings.Replace(r2pURL, "http", "ws", 1)+"/socket.io/?EIO=3&transport=websocket", + transport.GetDefaultWebsocketTransport(), + ) + if err != nil { + log.Errorf("not connected to r2p: %s", err.Error()) + return + } + defer api.Close() + + influx, err := InfluxDBConnect(client.HTTPConfig{ + Addr: influxAddr, + Username: influxUser, + Password: influxPassword, + }, influxDB) + if err != nil { + log.Errorf("not connected to influxdb: %s", err.Error()) + return + } + defer influx.Close() + + log.Info("startup") + + api.On("current", func(c *gosocketio.Channel, args interface{}) { + log.Debug("notified by websocket") + currentFloat := args.(float64) + log.Debugf("notified by websocket, get value: %d", currentFloat) + current := int(currentFloat) + + log.WithFields(map[string]interface{}{"source": "websocket", "current": current}).Info("notified") + + tags := models.Tags{} + tags.SetString("source", "websocket") + influx.addPoint("r2p_point", tags, models.Fields{ + "current": args, + }) + }) + + jsonWorker := worker.NewWorker(time.Second*2, func() { + result := R2PAPIAll{} + err := JSONRequest(r2pURL+"/GetAll", &result) + if err != nil { + log.Error("error on http-request r2p: ", err) + return + } + + log.WithFields(map[string]interface{}{"source": "http", "current": result.CountCurrent, "in": result.CountIn, "out": result.CountOut}).Info("requested") + + tags := models.Tags{} + tags.SetString("source", "http") + influx.addPoint("r2p_point", tags, models.Fields{ + "out": result.CountOut, + "in": result.CountIn, + "current": result.CountCurrent, + }) + }) + defer jsonWorker.Close() + go jsonWorker.Start() + + // Wait for INT/TERM + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + sig := <-sigs + log.Info("quit by recieving: ", sig) +}