support push status

This commit is contained in:
Geno 2021-08-18 13:07:25 +02:00
parent 2b504289ea
commit 14d35a9170
2 changed files with 97 additions and 3 deletions

61
api/response_push.go Normal file
View File

@ -0,0 +1,61 @@
package api
import (
"fmt"
)
// API URLS for Push
const (
URLRequestPushStatus = "/v1/vhosts/%s/apps/%s:pushes"
)
// ResponsePushStatus JSON Message with Push status data
type ResponsePushStatus struct {
Message string `json:"message"`
StatusCode int `json:"statusCode"`
Data []*ResponsePushData `json:"response,omitempty"`
}
// ResponsePushData one push configuration
type ResponsePushData struct {
VHost string `json:"vhost" example:"default"`
App string `json:"app" example:"live"`
ID string `json:"id" example:"youtube"`
Stream *ResponsePushDataStream `json:"stream"`
State string `json:"state" example:"ready"`
Protocol string `json:"protocol" example:"rtmp"`
URL string `json:"url" example:"rtmp://a.rtmp.youtube.com/live2"`
StreamKey string `json:"streamKey" example:"SUPERSECRET"`
// - timestamp - time.TIme has problem with nanosecond in JSON
CreatedTime string `json:"createdTime" example:"2021-07-19T23:13:12.162+0200"`
FinishedTime string `json:"finishedTime" example:"2021-07-19T23:23:27.274+0200"`
StartTime string `json:"startTime" example:"2021-07-19T23:23:27.077+0200"`
// - coonnections
Sequence int `json:"sequence" example:"1"`
// - traffic
SentBytes uint64 `json:"sentBytes" example:"0"`
SentTime uint64 `json:"sentTime" example:"0"`
TotalSentBytes uint64 `json:"totalsentBytes" example:"356233652"`
TotalSentTime uint64 `json:"totalsentTime" example:"933808"`
}
// ResponsePushDataStream of data of stream
type ResponsePushDataStream struct {
Name string `json:"name" example:"..."`
Tracks []int `json:"tracks" example:"[]"`
}
// RequestPushStatus to get list of pushes and his configuration
func (c *Client) RequestPushStatus(vhost, app string) (*ResponsePushStatus, error) {
req := ResponsePushStatus{}
url := fmt.Sprintf(URLRequestPushStatus, vhost, app)
if err := c.Request(url, &req); err != nil {
return nil, err
}
return &req, nil
}
// RequestPushStatusDefault to get list of pushes and his configuration for default vhost and app
func (c *Client) RequestPushStatusDefault() (*ResponsePushStatus, error) {
return c.RequestPushStatus(c.DefaultVHost, c.DefaultApp)
}

View File

@ -39,6 +39,17 @@ var (
promDescStatsStreamBytesInTotal, promDescStatsStreamBytesInTotal,
promDescStatsStreamBytesOutTotal, promDescStatsStreamBytesOutTotal,
} }
promDescPushUp = prometheus.NewDesc("oven_push_up", "state of push", []string{"vhost", "app", "stream", "id", "state"}, prometheus.Labels{})
promDescPushSequence = prometheus.NewDesc("oven_push_sequence", "sequence of started pushes", []string{"vhost", "app", "stream", "id"}, prometheus.Labels{})
promDescPushSentBytes = prometheus.NewDesc("oven_push_send_byte", "bytes send on push", []string{"vhost", "app", "stream", "id"}, prometheus.Labels{})
promDescPushTotalSentBytes = prometheus.NewDesc("oven_push_total_send_bytes", "total bytes send on push", []string{"vhost", "app", "stream", "id"}, prometheus.Labels{})
promDescPush = []*prometheus.Desc{
promDescPushUp,
promDescPushSequence,
promDescPushSentBytes,
promDescPushTotalSentBytes,
}
) )
func ResponseStatsToMetrics(resp *api.ResponseStats, descs []*prometheus.Desc, labels ...string) []prometheus.Metric { func ResponseStatsToMetrics(resp *api.ResponseStats, descs []*prometheus.Desc, labels ...string) []prometheus.Metric {
@ -49,13 +60,13 @@ func ResponseStatsToMetrics(resp *api.ResponseStats, descs []*prometheus.Desc, l
if m, err := prometheus.NewConstMetric(descs[0], prometheus.GaugeValue, float64(resp.Data.TotalConnections), labels...); err == nil { if m, err := prometheus.NewConstMetric(descs[0], prometheus.GaugeValue, float64(resp.Data.TotalConnections), labels...); err == nil {
list = append(list, m) list = append(list, m)
} }
if m, err := prometheus.NewConstMetric(descs[1], prometheus.GaugeValue, float64(resp.Data.MaxTotalConnections), labels...); err == nil { if m, err := prometheus.NewConstMetric(descs[1], prometheus.CounterValue, float64(resp.Data.MaxTotalConnections), labels...); err == nil {
list = append(list, m) list = append(list, m)
} }
if m, err := prometheus.NewConstMetric(descs[2], prometheus.GaugeValue, float64(resp.Data.TotalBytesIn), labels...); err == nil { if m, err := prometheus.NewConstMetric(descs[2], prometheus.CounterValue, float64(resp.Data.TotalBytesIn), labels...); err == nil {
list = append(list, m) list = append(list, m)
} }
if m, err := prometheus.NewConstMetric(descs[3], prometheus.GaugeValue, float64(resp.Data.TotalBytesOut), labels...); err == nil { if m, err := prometheus.NewConstMetric(descs[3], prometheus.CounterValue, float64(resp.Data.TotalBytesOut), labels...); err == nil {
list = append(list, m) list = append(list, m)
} }
return list return list
@ -71,6 +82,9 @@ func (c *configData) Describe(d chan<- *prometheus.Desc) {
for _, desc := range promDescStatsStream { for _, desc := range promDescStatsStream {
d <- desc d <- desc
} }
for _, desc := range promDescPush {
d <- desc
}
} }
func (c *configData) Collect(metrics chan<- prometheus.Metric) { func (c *configData) Collect(metrics chan<- prometheus.Metric) {
@ -97,6 +111,25 @@ func (c *configData) Collect(metrics chan<- prometheus.Metric) {
metrics <- m metrics <- m
} }
} }
if resp, err := c.API.RequestPushStatus(vhost, app); err != nil {
logApp.Errorf("unable to fetch pushes %s", err)
} else {
for _, data := range resp.Data {
if m, err := prometheus.NewConstMetric(promDescPushUp, prometheus.GaugeValue, 1, vhost, app, data.Stream.Name, data.ID, data.State); err == nil {
metrics <- m
}
labels := []string{vhost, app, data.Stream.Name, data.ID}
if m, err := prometheus.NewConstMetric(promDescPushSequence, prometheus.CounterValue, float64(data.Sequence), labels...); err == nil {
metrics <- m
}
if m, err := prometheus.NewConstMetric(promDescPushSentBytes, prometheus.GaugeValue, float64(data.SentBytes), labels...); err == nil {
metrics <- m
}
if m, err := prometheus.NewConstMetric(promDescPushTotalSentBytes, prometheus.CounterValue, float64(data.TotalSentBytes), labels...); err == nil {
metrics <- m
}
}
}
respList, err = c.API.RequestListStreams(vhost, app) respList, err = c.API.RequestListStreams(vhost, app)
if err != nil { if err != nil {
logApp.Errorf("unable to fetch stream: %s", err) logApp.Errorf("unable to fetch stream: %s", err)