Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New metric for relayed lines #434

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.30.0
github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.4.0
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807 h1:LUsDduamlucuNnWcaTbXQ6aLILFcLXADpOzeEH3U+OI=
github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
22 changes: 17 additions & 5 deletions pkg/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"strings"
"time"

"github.com/prometheus/statsd_exporter/pkg/clock"

"github.com/go-kit/log"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -34,8 +36,9 @@ type Relay struct {
logger log.Logger
packetLength uint

packetsTotal prometheus.Counter
longLinesTotal prometheus.Counter
packetsTotal prometheus.Counter
longLinesTotal prometheus.Counter
relayedLinesTotal prometheus.Counter
}

var (
Expand All @@ -53,6 +56,13 @@ var (
},
[]string{"target"},
)
relayLinesRelayedTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_relay_lines_relayed_total",
Help: "The number of lines that were buffered to be relayed.",
},
[]string{"target"},
)
)

// NewRelay creates a statsd UDP relay. It can be used to send copies of statsd raw
Expand All @@ -76,8 +86,9 @@ func NewRelay(l log.Logger, target string, packetLength uint) (*Relay, error) {
logger: l,
packetLength: packetLength,

packetsTotal: relayPacketsTotal.WithLabelValues(target),
longLinesTotal: relayLongLinesTotal.WithLabelValues(target),
packetsTotal: relayPacketsTotal.WithLabelValues(target),
longLinesTotal: relayLongLinesTotal.WithLabelValues(target),
relayedLinesTotal: relayLinesRelayedTotal.WithLabelValues(target),
}

// Startup the UDP sender.
Expand All @@ -91,7 +102,7 @@ func (r *Relay) relayOutput() {
var buffer bytes.Buffer
var err error

relayInterval := time.NewTicker(1 * time.Second)
relayInterval := clock.NewTicker(1 * time.Second)
defer relayInterval.Stop()

for {
Expand Down Expand Up @@ -151,5 +162,6 @@ func (r *Relay) RelayLine(l string) {
if !strings.HasSuffix(l, "\n") {
l = l + "\n"
}
r.relayedLinesTotal.Inc()
r.bufferChannel <- []byte(l)
}
167 changes: 167 additions & 0 deletions pkg/relay/relay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2013 The Prometheus Authors
matthiasr marked this conversation as resolved.
Show resolved Hide resolved
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package relay

import (
"fmt"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/stvp/go-udp-testing"
)

func TestRelay_RelayLine(t *testing.T) {
type args struct {
lines []string
expected string
}

tests := []struct {
name string
args args
}{
{
name: "multiple lines",
args: args{
lines: []string{"foo5:100|c|#tag1:bar,#tag2:baz", "foo2:200|c|#tag1:bar,#tag2:baz"},
expected: "foo5:100|c|#tag1:bar,#tag2:baz\n",
},
},
}

for _, tt := range tests {
udp.SetAddr(":1160")
t.Run(tt.name, func(t *testing.T) {
tickerCh := make(chan time.Time)
clock.ClockInstance = &clock.Clock{
TickerCh: tickerCh,
}
clock.ClockInstance.Instant = time.Unix(0, 0)

logger := log.NewNopLogger()
r, err := NewRelay(
logger,
"localhost:1160",
200,
)

if err != nil {
t.Errorf("Did not expect error while creating relay.")
}

udp.ShouldReceive(t, tt.args.expected, func() {
for _, line := range tt.args.lines {
r.RelayLine(line)
}
// Tick time forward to trigger a packet send.
clock.ClockInstance.Instant = time.Unix(1, 10)
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
})

metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatalf("Cannot gather from DefaultGatherer: %v", err)
}

metricNames := map[string]float64{
"statsd_exporter_relay_long_lines_total": 0,
"statsd_exporter_relay_lines_relayed_total": float64(len(tt.args.lines)),
}
for metricName, expectedValue := range metricNames {
metric := getFloat64(metrics, metricName, prometheus.Labels{"target": "localhost:1160"})

if metric == nil {
t.Fatalf("Could not find time series with first label set for metric: %s", metricName)
}
if *metric != expectedValue {
t.Errorf("Expected metric %s to be %f, got %f", metricName, expectedValue, *metric)
}
}

prometheus.Unregister(relayLongLinesTotal)
prometheus.Unregister(relayLinesRelayedTotal)
})
}
}

// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels.
// Method returns a value or nil if metric is not found.
func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 {
var metricFamily *dto.MetricFamily
for _, m := range metrics {
if *m.Name == name {
metricFamily = m
break
}
}
if metricFamily == nil {
return nil
}

var metric *dto.Metric
labelStr := fmt.Sprintf("%v", labels)
for _, m := range metricFamily.Metric {
l := labelPairsAsLabels(m.GetLabel())
ls := fmt.Sprintf("%v", l)
if labelStr == ls {
metric = m
break
}
}
if metric == nil {
return nil
}

var value float64
if metric.Gauge != nil {
value = metric.Gauge.GetValue()
return &value
}
if metric.Counter != nil {
value = metric.Counter.GetValue()
return &value
}
if metric.Histogram != nil {
value = metric.Histogram.GetSampleSum()
return &value
}
if metric.Summary != nil {
value = metric.Summary.GetSampleSum()
return &value
}
if metric.Untyped != nil {
value = metric.Untyped.GetValue()
return &value
}
panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric))
}

func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) {
labels = prometheus.Labels{}
for _, pair := range pairs {
if pair.Name == nil {
continue
}
value := ""
if pair.Value != nil {
value = *pair.Value
}
labels[*pair.Name] = value
}
return
}