diff --git a/api/response_push.go b/api/response_push.go new file mode 100644 index 0000000..058b47c --- /dev/null +++ b/api/response_push.go @@ -0,0 +1,61 @@ +package api + +import ( + "fmt" +) + +// API URLS for Push +const ( + URLRequestPushStatus = "/v1/vhosts/%s/apps/%s:pushes" +) + +// ResponsePushStatus JSON Message with Push status data +type ResponsePushStatus struct { + Message string `json:"message"` + StatusCode int `json:"statusCode"` + Data []*ResponsePushData `json:"response,omitempty"` +} + +// ResponsePushData one push configuration +type ResponsePushData struct { + VHost string `json:"vhost" example:"default"` + App string `json:"app" example:"live"` + ID string `json:"id" example:"youtube"` + Stream *ResponsePushDataStream `json:"stream"` + State string `json:"state" example:"ready"` + Protocol string `json:"protocol" example:"rtmp"` + URL string `json:"url" example:"rtmp://a.rtmp.youtube.com/live2"` + StreamKey string `json:"streamKey" example:"SUPERSECRET"` + // - timestamp - time.TIme has problem with nanosecond in JSON + CreatedTime string `json:"createdTime" example:"2021-07-19T23:13:12.162+0200"` + FinishedTime string `json:"finishedTime" example:"2021-07-19T23:23:27.274+0200"` + StartTime string `json:"startTime" example:"2021-07-19T23:23:27.077+0200"` + // - coonnections + Sequence int `json:"sequence" example:"1"` + // - traffic + SentBytes uint64 `json:"sentBytes" example:"0"` + SentTime uint64 `json:"sentTime" example:"0"` + TotalSentBytes uint64 `json:"totalsentBytes" example:"356233652"` + TotalSentTime uint64 `json:"totalsentTime" example:"933808"` +} + +// ResponsePushDataStream of data of stream +type ResponsePushDataStream struct { + Name string `json:"name" example:"..."` + Tracks []int `json:"tracks" example:"[]"` +} + +// RequestPushStatus to get list of pushes and his configuration +func (c *Client) RequestPushStatus(vhost, app string) (*ResponsePushStatus, error) { + req := ResponsePushStatus{} + url := fmt.Sprintf(URLRequestPushStatus, vhost, app) + if err := c.Request(url, &req); err != nil { + return nil, err + } + return &req, nil +} + +// RequestPushStatusDefault to get list of pushes and his configuration for default vhost and app +func (c *Client) RequestPushStatusDefault() (*ResponsePushStatus, error) { + return c.RequestPushStatus(c.DefaultVHost, c.DefaultApp) +} diff --git a/prometheus.go b/prometheus.go index 4515a86..bb435cc 100644 --- a/prometheus.go +++ b/prometheus.go @@ -39,6 +39,17 @@ var ( promDescStatsStreamBytesInTotal, promDescStatsStreamBytesOutTotal, } + + promDescPushUp = prometheus.NewDesc("oven_push_up", "state of push", []string{"vhost", "app", "stream", "id", "state"}, prometheus.Labels{}) + promDescPushSequence = prometheus.NewDesc("oven_push_sequence", "sequence of started pushes", []string{"vhost", "app", "stream", "id"}, prometheus.Labels{}) + promDescPushSentBytes = prometheus.NewDesc("oven_push_send_byte", "bytes send on push", []string{"vhost", "app", "stream", "id"}, prometheus.Labels{}) + promDescPushTotalSentBytes = prometheus.NewDesc("oven_push_total_send_bytes", "total bytes send on push", []string{"vhost", "app", "stream", "id"}, prometheus.Labels{}) + promDescPush = []*prometheus.Desc{ + promDescPushUp, + promDescPushSequence, + promDescPushSentBytes, + promDescPushTotalSentBytes, + } ) func ResponseStatsToMetrics(resp *api.ResponseStats, descs []*prometheus.Desc, labels ...string) []prometheus.Metric { @@ -49,13 +60,13 @@ func ResponseStatsToMetrics(resp *api.ResponseStats, descs []*prometheus.Desc, l if m, err := prometheus.NewConstMetric(descs[0], prometheus.GaugeValue, float64(resp.Data.TotalConnections), labels...); err == nil { list = append(list, m) } - if m, err := prometheus.NewConstMetric(descs[1], prometheus.GaugeValue, float64(resp.Data.MaxTotalConnections), labels...); err == nil { + if m, err := prometheus.NewConstMetric(descs[1], prometheus.CounterValue, float64(resp.Data.MaxTotalConnections), labels...); err == nil { list = append(list, m) } - if m, err := prometheus.NewConstMetric(descs[2], prometheus.GaugeValue, float64(resp.Data.TotalBytesIn), labels...); err == nil { + if m, err := prometheus.NewConstMetric(descs[2], prometheus.CounterValue, float64(resp.Data.TotalBytesIn), labels...); err == nil { list = append(list, m) } - if m, err := prometheus.NewConstMetric(descs[3], prometheus.GaugeValue, float64(resp.Data.TotalBytesOut), labels...); err == nil { + if m, err := prometheus.NewConstMetric(descs[3], prometheus.CounterValue, float64(resp.Data.TotalBytesOut), labels...); err == nil { list = append(list, m) } return list @@ -71,6 +82,9 @@ func (c *configData) Describe(d chan<- *prometheus.Desc) { for _, desc := range promDescStatsStream { d <- desc } + for _, desc := range promDescPush { + d <- desc + } } func (c *configData) Collect(metrics chan<- prometheus.Metric) { @@ -97,6 +111,25 @@ func (c *configData) Collect(metrics chan<- prometheus.Metric) { metrics <- m } } + if resp, err := c.API.RequestPushStatus(vhost, app); err != nil { + logApp.Errorf("unable to fetch pushes %s", err) + } else { + for _, data := range resp.Data { + if m, err := prometheus.NewConstMetric(promDescPushUp, prometheus.GaugeValue, 1, vhost, app, data.Stream.Name, data.ID, data.State); err == nil { + metrics <- m + } + labels := []string{vhost, app, data.Stream.Name, data.ID} + if m, err := prometheus.NewConstMetric(promDescPushSequence, prometheus.CounterValue, float64(data.Sequence), labels...); err == nil { + metrics <- m + } + if m, err := prometheus.NewConstMetric(promDescPushSentBytes, prometheus.GaugeValue, float64(data.SentBytes), labels...); err == nil { + metrics <- m + } + if m, err := prometheus.NewConstMetric(promDescPushTotalSentBytes, prometheus.CounterValue, float64(data.TotalSentBytes), labels...); err == nil { + metrics <- m + } + } + } respList, err = c.API.RequestListStreams(vhost, app) if err != nil { logApp.Errorf("unable to fetch stream: %s", err)