Skip to content

Commit

Permalink
Merge pull request #691 from endorama/otel-zero-metrics
Browse files Browse the repository at this point in the history
kotel: optionally only use messaging.kafka.connects.count for connection metrics
  • Loading branch information
twmb authored Jun 8, 2024
2 parents 7641d9e + 17f85d6 commit a5b1d0c
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 7 deletions.
58 changes: 51 additions & 7 deletions plugin/kotel/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Meter struct {
provider metric.MeterProvider
meter metric.Meter
instruments instruments

mergeConnectsMeter bool
}

// MeterOpt interface used for setting optional config properties.
Expand All @@ -52,6 +54,20 @@ func MeterProvider(provider metric.MeterProvider) MeterOpt {
})
}

// WithMergedConnectsMeter merges the `messaging.kafka.connect_errors.count`
// counter into the `messaging.kafka.connects.count` counter, adding an
// attribute "outcome" with the values "success" or "failure". This option
// shall be used when a single metric with different dimensions is preferred
// over two separate metrics that produce data at alternating intervals.
// For example, it becomes possible to alert on the metric no longer
// producing data.
func WithMergedConnectsMeter() MeterOpt {
return meterOptFunc(func(m *Meter) {
m.mergeConnectsMeter = true
})

}

func (o meterOptFunc) apply(m *Meter) {
o(m)
}
Expand Down Expand Up @@ -105,13 +121,17 @@ func (m *Meter) newInstruments() instruments {
log.Printf("failed to create connects instrument, %v", err)
}

connectErrs, err := m.meter.Int64Counter(
"messaging.kafka.connect_errors.count",
metric.WithUnit(dimensionless),
metric.WithDescription("Total number of connection errors, by broker"),
)
if err != nil {
log.Printf("failed to create connectErrs instrument, %v", err)
var connectErrs metric.Int64Counter
if !m.mergeConnectsMeter {
var err error
connectErrs, err = m.meter.Int64Counter(
"messaging.kafka.connect_errors.count",
metric.WithUnit(dimensionless),
metric.WithDescription("Total number of connection errors, by broker"),
)
if err != nil {
log.Printf("failed to create connectErrs instrument, %v", err)
}
}

disconnects, err := m.meter.Int64Counter(
Expand Down Expand Up @@ -232,6 +252,30 @@ func strnode(node int32) string {

func (m *Meter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
node := strnode(meta.NodeID)

if m.mergeConnectsMeter {
if err != nil {
m.instruments.connects.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(
attribute.String("node_id", node),
attribute.String("outcome", "failure"),
)),
)
return
}
m.instruments.connects.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(
attribute.String("node_id", node),
attribute.String("outcome", "success"),
)),
)
return
}

attributes := attribute.NewSet(attribute.String("node_id", node))
if err != nil {
m.instruments.connectErrs.Add(
Expand Down
173 changes: 173 additions & 0 deletions plugin/kotel/meter_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
package kotel

import (
"context"
"errors"
"net"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
)

Expand Down Expand Up @@ -38,3 +48,166 @@ func TestWithMeter(t *testing.T) {
})
}
}

func TestHook_OnBrokerConnect(t *testing.T) {
t.Run("success path with mergeConnectsMeter:false", func(t *testing.T) {
r := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r))
m := NewMeter(MeterProvider(mp))

meta := kgo.BrokerMetadata{NodeID: 1}
m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, nil)

rm := metricdata.ResourceMetrics{}
if err := r.Collect(context.Background(), &rm); err != nil {
t.Errorf("unexpected error collecting metrics: %s", err)
}

want := metricdata.Metrics{
Name: "messaging.kafka.connects.count",
Description: "Total number of connections opened, by broker",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
attribute.String("node_id", strconv.Itoa(int(meta.NodeID))),
),
},
},
},
}

if len(rm.ScopeMetrics) != 1 {
t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics))
}

metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0],
metricdatatest.IgnoreTimestamp(),
)
})
t.Run("failure path with mergeConnectsMeter:false", func(t *testing.T) {
r := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r))
m := NewMeter(MeterProvider(mp))

meta := kgo.BrokerMetadata{NodeID: 1}
m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, errors.New("whatever error"))

rm := metricdata.ResourceMetrics{}
if err := r.Collect(context.Background(), &rm); err != nil {
t.Errorf("unexpected error collecting metrics: %s", err)
}

want := metricdata.Metrics{
Name: "messaging.kafka.connect_errors.count",
Description: "Total number of connection errors, by broker",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
attribute.String("node_id", strconv.Itoa(int(meta.NodeID))),
),
},
},
},
}

if len(rm.ScopeMetrics) != 1 {
t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics))
}

metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0],
metricdatatest.IgnoreTimestamp(),
)
})

t.Run("success path with mergeConnectsMeter:true", func(t *testing.T) {
r := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r))
m := NewMeter(MeterProvider(mp), WithMergedConnectsMeter())

meta := kgo.BrokerMetadata{NodeID: 1}
m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, nil)

rm := metricdata.ResourceMetrics{}
if err := r.Collect(context.Background(), &rm); err != nil {
t.Errorf("unexpected error collecting metrics: %s", err)
}

want := metricdata.Metrics{
Name: "messaging.kafka.connects.count",
Description: "Total number of connections opened, by broker",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
attribute.String("node_id", strconv.Itoa(int(meta.NodeID))),
attribute.String("outcome", "success"),
),
},
},
},
}

if len(rm.ScopeMetrics) != 1 {
t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics))
}

metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0],
metricdatatest.IgnoreTimestamp(),
)
})
t.Run("failure path with mergeConnectsMeter:true", func(t *testing.T) {
r := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r))
m := NewMeter(MeterProvider(mp), WithMergedConnectsMeter())

meta := kgo.BrokerMetadata{NodeID: 1}
m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, errors.New("whatever error"))

rm := metricdata.ResourceMetrics{}
if err := r.Collect(context.Background(), &rm); err != nil {
t.Errorf("unexpected error collecting metrics: %s", err)
}

want := metricdata.Metrics{
Name: "messaging.kafka.connects.count",
Description: "Total number of connections opened, by broker",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
attribute.String("node_id", strconv.Itoa(int(meta.NodeID))),
attribute.String("outcome", "failure"),
),
},
},
},
}

if len(rm.ScopeMetrics) != 1 {
t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics))
}

metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0],
metricdatatest.IgnoreTimestamp(),
)
})

}

0 comments on commit a5b1d0c

Please sign in to comment.