Skip to content

Commit

Permalink
batch metrics and flush periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
ElvinEfendi committed Aug 18, 2018
1 parent b78bb25 commit 2207d76
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 256 deletions.
145 changes: 72 additions & 73 deletions internal/ingress/metric/collectors/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"os"

Expand Down Expand Up @@ -206,92 +207,94 @@ func (sc *SocketCollector) handleMessage(msg []byte) {
glog.V(5).Infof("msg: %v", string(msg))

// Unmarshall bytes
var stats socketData
err := json.Unmarshal(msg, &stats)
var statsBatch []socketData
err := json.Unmarshal(msg, &statsBatch)
if err != nil {
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
glog.Errorf("Unexpected error deserializing JSON paylod: %v. Payload:\n%v", err, string(msg))
return
}

requestLabels := prometheus.Labels{
"host": stats.Host,
"status": stats.Status,
"method": stats.Method,
"path": stats.Path,
//"endpoint": stats.Endpoint,
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}

collectorLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"status": stats.Status,
}
for _, stats := range statsBatch {
requestLabels := prometheus.Labels{
"host": stats.Host,
"status": stats.Status,
"method": stats.Method,
"path": stats.Path,
//"endpoint": stats.Endpoint,
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}

latencyLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}
collectorLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"status": stats.Status,
}

requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching requests metric: %v", err)
} else {
requestsMetric.Inc()
}
latencyLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}

if stats.Latency != -1 {
latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching latency metric: %v", err)
glog.Errorf("Error fetching requests metric: %v", err)
} else {
latencyMetric.Observe(stats.Latency)
requestsMetric.Inc()
}
}

if stats.RequestTime != -1 {
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request duration metric: %v", err)
} else {
requestTimeMetric.Observe(stats.RequestTime)
if stats.Latency != -1 {
latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
if err != nil {
glog.Errorf("Error fetching latency metric: %v", err)
} else {
latencyMetric.Observe(stats.Latency)
}
}
}

if stats.RequestLength != -1 {
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request length metric: %v", err)
} else {
requestLengthMetric.Observe(stats.RequestLength)
if stats.RequestTime != -1 {
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request duration metric: %v", err)
} else {
requestTimeMetric.Observe(stats.RequestTime)
}
}
}

if stats.ResponseTime != -1 {
responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching upstream response time metric: %v", err)
} else {
responseTimeMetric.Observe(stats.ResponseTime)
if stats.RequestLength != -1 {
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request length metric: %v", err)
} else {
requestLengthMetric.Observe(stats.RequestLength)
}
}
}

if stats.ResponseLength != -1 {
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
} else {
bytesSentMetric.Observe(stats.ResponseLength)
if stats.ResponseTime != -1 {
responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching upstream response time metric: %v", err)
} else {
responseTimeMetric.Observe(stats.ResponseTime)
}
}

responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
} else {
responseSizeMetric.Observe(stats.ResponseLength)
if stats.ResponseLength != -1 {
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
} else {
bytesSentMetric.Observe(stats.ResponseLength)
}

responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
} else {
responseSizeMetric.Observe(stats.ResponseLength)
}
}
}
}
Expand Down Expand Up @@ -408,19 +411,15 @@ func (sc SocketCollector) Collect(ch chan<- prometheus.Metric) {
sc.bytesSent.Collect(ch)
}

const packetSize = 1024 * 65

// handleMessages process the content received in a network connection
func handleMessages(conn io.ReadCloser, fn func([]byte)) {
defer conn.Close()

msg := make([]byte, packetSize)
s, err := conn.Read(msg[0:])
data, err := ioutil.ReadAll(conn)
if err != nil {
return
}

fn(msg[0:s])
fn(data)
}

func deleteConstants(labels prometheus.Labels) {
Expand Down
71 changes: 65 additions & 6 deletions internal/ingress/metric/collectors/socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestCollector(t *testing.T) {
},
{
name: "valid metric object should update prometheus metrics",
data: []string{`{
data: []string{`[{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
Expand All @@ -115,7 +115,7 @@ func TestCollector(t *testing.T) {
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
}`},
}]`},
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
wantBefore: `
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
Expand All @@ -142,7 +142,7 @@ func TestCollector(t *testing.T) {

{
name: "multiple messages should increase prometheus metric by two",
data: []string{`{
data: []string{`[{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
Expand All @@ -157,7 +157,7 @@ func TestCollector(t *testing.T) {
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
}`, `{
}]`, `[{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
Expand All @@ -172,7 +172,7 @@ func TestCollector(t *testing.T) {
"namespace":"test-app-qa",
"ingress":"web-yml-qa",
"service":"test-app-qa"
}`, `{
}]`, `[{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
Expand All @@ -187,7 +187,7 @@ func TestCollector(t *testing.T) {
"namespace":"test-app-qa",
"ingress":"web-yml-qa",
"service":"test-app-qa"
}`},
}]`},
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
wantBefore: `
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
Expand Down Expand Up @@ -222,6 +222,65 @@ func TestCollector(t *testing.T) {
nginx_ingress_controller_response_duration_seconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200"} 2
`,
},

{
name: "collector should be able to handle batched metrics correctly",
data: []string{`[
{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
"method":"GET",
"path":"/admin",
"requestLength":300.0,
"requestTime":60.0,
"upstreamName":"test-upstream",
"upstreamIP":"1.1.1.1:8080",
"upstreamResponseTime":200,
"upstreamStatus":"220",
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
},
{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
"method":"GET",
"path":"/admin",
"requestLength":300.0,
"requestTime":60.0,
"upstreamName":"test-upstream",
"upstreamIP":"1.1.1.1:8080",
"upstreamResponseTime":100,
"upstreamStatus":"220",
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
}]`},
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
wantBefore: `
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
# TYPE nginx_ingress_controller_response_duration_seconds histogram
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.005"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.01"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.025"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.05"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.1"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.25"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.5"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="1"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="2.5"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="5"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="10"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="+Inf"} 2
nginx_ingress_controller_response_duration_seconds_sum{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 300
nginx_ingress_controller_response_duration_seconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 2
`,
removeIngresses: []string{"test-app-production/web-yml"},
wantAfter: `
`,
},
}

for _, c := range cases {
Expand Down
Loading

0 comments on commit 2207d76

Please sign in to comment.