Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch metrics and flush periodically #2957

Merged
merged 1 commit into from
Aug 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 72 additions & 73 deletions internal/ingress/metric/collectors/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"os"

Expand Down Expand Up @@ -206,92 +207,94 @@ func (sc *SocketCollector) handleMessage(msg []byte) {
glog.V(5).Infof("msg: %v", string(msg))

// Unmarshall bytes
var stats socketData
err := json.Unmarshal(msg, &stats)
var statsBatch []socketData
err := json.Unmarshal(msg, &statsBatch)
if err != nil {
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
glog.Errorf("Unexpected error deserializing JSON paylod: %v. Payload:\n%v", err, string(msg))
return
}

requestLabels := prometheus.Labels{
"host": stats.Host,
"status": stats.Status,
"method": stats.Method,
"path": stats.Path,
//"endpoint": stats.Endpoint,
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}

collectorLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"status": stats.Status,
}
for _, stats := range statsBatch {
requestLabels := prometheus.Labels{
"host": stats.Host,
"status": stats.Status,
"method": stats.Method,
"path": stats.Path,
//"endpoint": stats.Endpoint,
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}

latencyLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}
collectorLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"status": stats.Status,
}

requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching requests metric: %v", err)
} else {
requestsMetric.Inc()
}
latencyLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}

if stats.Latency != -1 {
latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching latency metric: %v", err)
glog.Errorf("Error fetching requests metric: %v", err)
} else {
latencyMetric.Observe(stats.Latency)
requestsMetric.Inc()
}
}

if stats.RequestTime != -1 {
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request duration metric: %v", err)
} else {
requestTimeMetric.Observe(stats.RequestTime)
if stats.Latency != -1 {
latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
if err != nil {
glog.Errorf("Error fetching latency metric: %v", err)
} else {
latencyMetric.Observe(stats.Latency)
}
}
}

if stats.RequestLength != -1 {
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request length metric: %v", err)
} else {
requestLengthMetric.Observe(stats.RequestLength)
if stats.RequestTime != -1 {
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request duration metric: %v", err)
} else {
requestTimeMetric.Observe(stats.RequestTime)
}
}
}

if stats.ResponseTime != -1 {
responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching upstream response time metric: %v", err)
} else {
responseTimeMetric.Observe(stats.ResponseTime)
if stats.RequestLength != -1 {
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request length metric: %v", err)
} else {
requestLengthMetric.Observe(stats.RequestLength)
}
}
}

if stats.ResponseLength != -1 {
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
} else {
bytesSentMetric.Observe(stats.ResponseLength)
if stats.ResponseTime != -1 {
responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching upstream response time metric: %v", err)
} else {
responseTimeMetric.Observe(stats.ResponseTime)
}
}

responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
} else {
responseSizeMetric.Observe(stats.ResponseLength)
if stats.ResponseLength != -1 {
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
} else {
bytesSentMetric.Observe(stats.ResponseLength)
}

responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
} else {
responseSizeMetric.Observe(stats.ResponseLength)
}
}
}
}
Expand Down Expand Up @@ -408,19 +411,15 @@ func (sc SocketCollector) Collect(ch chan<- prometheus.Metric) {
sc.bytesSent.Collect(ch)
}

const packetSize = 1024 * 65

// handleMessages process the content received in a network connection
func handleMessages(conn io.ReadCloser, fn func([]byte)) {
defer conn.Close()

msg := make([]byte, packetSize)
s, err := conn.Read(msg[0:])
data, err := ioutil.ReadAll(conn)
if err != nil {
return
}

fn(msg[0:s])
fn(data)
}

func deleteConstants(labels prometheus.Labels) {
Expand Down
71 changes: 65 additions & 6 deletions internal/ingress/metric/collectors/socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestCollector(t *testing.T) {
},
{
name: "valid metric object should update prometheus metrics",
data: []string{`{
data: []string{`[{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
Expand All @@ -115,7 +115,7 @@ func TestCollector(t *testing.T) {
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
}`},
}]`},
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
wantBefore: `
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
Expand All @@ -142,7 +142,7 @@ func TestCollector(t *testing.T) {

{
name: "multiple messages should increase prometheus metric by two",
data: []string{`{
data: []string{`[{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
Expand All @@ -157,7 +157,7 @@ func TestCollector(t *testing.T) {
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
}`, `{
}]`, `[{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
Expand All @@ -172,7 +172,7 @@ func TestCollector(t *testing.T) {
"namespace":"test-app-qa",
"ingress":"web-yml-qa",
"service":"test-app-qa"
}`, `{
}]`, `[{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
Expand All @@ -187,7 +187,7 @@ func TestCollector(t *testing.T) {
"namespace":"test-app-qa",
"ingress":"web-yml-qa",
"service":"test-app-qa"
}`},
}]`},
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
wantBefore: `
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
Expand Down Expand Up @@ -222,6 +222,65 @@ func TestCollector(t *testing.T) {
nginx_ingress_controller_response_duration_seconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200"} 2
`,
},

{
name: "collector should be able to handle batched metrics correctly",
data: []string{`[
{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
"method":"GET",
"path":"/admin",
"requestLength":300.0,
"requestTime":60.0,
"upstreamName":"test-upstream",
"upstreamIP":"1.1.1.1:8080",
"upstreamResponseTime":200,
"upstreamStatus":"220",
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
},
{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
"method":"GET",
"path":"/admin",
"requestLength":300.0,
"requestTime":60.0,
"upstreamName":"test-upstream",
"upstreamIP":"1.1.1.1:8080",
"upstreamResponseTime":100,
"upstreamStatus":"220",
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
}]`},
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
wantBefore: `
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
# TYPE nginx_ingress_controller_response_duration_seconds histogram
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.005"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.01"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.025"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.05"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.1"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.25"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.5"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="1"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="2.5"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="5"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="10"} 0
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="+Inf"} 2
nginx_ingress_controller_response_duration_seconds_sum{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 300
nginx_ingress_controller_response_duration_seconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 2
`,
removeIngresses: []string{"test-app-production/web-yml"},
wantAfter: `
`,
},
}

for _, c := range cases {
Expand Down
Loading