diff --git a/binOp.go b/binOp.go index 5be4685..9784f6a 100644 --- a/binOp.go +++ b/binOp.go @@ -11,9 +11,9 @@ import ( // Errors var ( - errIterableTypeOnly = errors.New("Only slices and arrays are currently supported as iterable") - errNumericalTypeOnly = errors.New("Only numerical values are supported") - errOpNotSupported = errors.New("Operation not supported") + errIterableTypeOnly = errors.New("only slices and arrays are currently supported as iterable") + errNumericalTypeOnly = errors.New("only numerical values are supported") + errOpNotSupported = errors.New("operation not supported") ) // Supported operations diff --git a/control.go b/control.go index 9e6265a..adc65e4 100644 --- a/control.go +++ b/control.go @@ -3,10 +3,13 @@ package main import ( + "errors" "fmt" "net/http" + "github.com/prometheus/client_golang/prometheus" "github.com/xmidt-org/candlelight" + "github.com/xmidt-org/touchstone" "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" "go.uber.org/zap" @@ -20,9 +23,6 @@ import ( // nolint:staticcheck "github.com/xmidt-org/webpa-common/v2/xhttp" "github.com/xmidt-org/webpa-common/v2/xhttp/gate" - - // nolint:staticcheck - "github.com/xmidt-org/webpa-common/v2/xmetrics" ) const ( @@ -32,7 +32,7 @@ const ( drainPath = "/device/drain" ) -func StartControlServer(logger *zap.Logger, manager device.Manager, deviceGate devicegate.Interface, registry xmetrics.Registry, v *viper.Viper, tracing candlelight.Tracing) (func(http.Handler) http.Handler, error) { +func StartControlServer(logger *zap.Logger, manager device.Manager, deviceGate devicegate.Interface, tf *touchstone.Factory, v *viper.Viper, tracing candlelight.Tracing) (func(http.Handler) http.Handler, error) { if !v.IsSet(ControlKey) { return xhttp.NilConstructor, nil } @@ -44,17 +44,45 @@ func StartControlServer(logger *zap.Logger, manager device.Manager, deviceGate d options.Logger = logger + var errs error + gateStatus, err := tf.NewGauge( + prometheus.GaugeOpts{ + Name: GateStatus, + Help: "Indicates whether the device gate is open (1.0) or closed (0.0)", + }, + ) + errs = errors.Join(errs, err) + + drainStatus, err := tf.NewGauge( + prometheus.GaugeOpts{ + Name: DrainStatus, + Help: "Indicates whether a device drain operation is currently running", + }, + ) + errs = errors.Join(errs, err) + + drainCounter, err := tf.NewGauge( + prometheus.GaugeOpts{ + Name: DrainCounter, + Help: "The total count of devices disconnected due to a drain since the server started", + }, + ) + errs = errors.Join(errs, err) + if errs != nil { + return xhttp.NilConstructor, err + } + var ( g = gate.New( true, - gate.WithGauge(registry.NewGauge(GateStatus)), + gate.WithGauge(gateStatus), ) d = drain.New( drain.WithLogger(logger), drain.WithManager(manager), - drain.WithStateGauge(registry.NewGauge(DrainStatus)), - drain.WithDrainCounter(registry.NewCounter(DrainCounter)), + drain.WithStateGauge(drainStatus), + drain.WithDrainCounter(drainCounter), ) gateLogger = devicegate.GateLogger{Logger: logger} diff --git a/deviceAccess.go b/deviceAccess.go index 0dd6d52..5606a4a 100644 --- a/deviceAccess.go +++ b/deviceAccess.go @@ -7,7 +7,7 @@ import ( "net/http" "github.com/fatih/structs" - "github.com/go-kit/kit/metrics" + "github.com/prometheus/client_golang/prometheus" "github.com/thedevsaddam/gojsonq/v2" "github.com/xmidt-org/webpa-common/v2/device" "go.uber.org/zap" @@ -77,26 +77,26 @@ type deviceAccess interface { type talariaDeviceAccess struct { strict bool - wrpMessagesCounter metrics.Counter + wrpMessagesCounter CounterVec deviceRegistry device.Registry checks []*parsedCheck sep string logger *zap.Logger } -func (t *talariaDeviceAccess) withFailure(labelValues ...string) metrics.Counter { +func (t *talariaDeviceAccess) withFailure(labelValues ...string) prometheus.Counter { if !t.strict { return t.withSuccess(labelValues...) } - return t.wrpMessagesCounter.With(append(labelValues, outcomeLabel, rejected)...) + return t.wrpMessagesCounter.WithLabelValues(append(labelValues, outcomeLabel, rejected)...) } -func (t *talariaDeviceAccess) withFatal(labelValues ...string) metrics.Counter { - return t.wrpMessagesCounter.With(append(labelValues, outcomeLabel, rejected)...) +func (t *talariaDeviceAccess) withFatal(labelValues ...string) prometheus.Counter { + return t.wrpMessagesCounter.WithLabelValues(append(labelValues, outcomeLabel, rejected)...) } -func (t *talariaDeviceAccess) withSuccess(labelValues ...string) metrics.Counter { - return t.wrpMessagesCounter.With(append(labelValues, outcomeLabel, accepted)...) +func (t *talariaDeviceAccess) withSuccess(labelValues ...string) prometheus.Counter { + return t.wrpMessagesCounter.WithLabelValues(append(labelValues, outcomeLabel, accepted)...) } func getRight(check *parsedCheck, wrpCredentials *gojsonq.JSONQ) interface{} { diff --git a/deviceAccess_test.go b/deviceAccess_test.go index d8f7d25..fb0775a 100644 --- a/deviceAccess_test.go +++ b/deviceAccess_test.go @@ -7,9 +7,6 @@ import ( "errors" "testing" - // nolint:staticcheck - - "github.com/go-kit/kit/metrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/xmidt-org/webpa-common/v2/device" @@ -43,7 +40,7 @@ func testAuthorizeWRP(t *testing.T, testCases []deviceAccessTestCase, strict boo mockDevice = new(device.MockDevice) mockBinOp = new(mockBinOp) testLogger = zaptest.NewLogger(t) - counter = newTestCounter() + counter = mockCounter{labelPairs: make(map[string]string)} expectedLabels = getLabelMaps(testCase.ExpectedError, testCase.IsFatal, strict, testCase.BaseLabelPairs) wrpMsg = &wrp.Message{ @@ -69,9 +66,11 @@ func testAuthorizeWRP(t *testing.T, testCases []deviceAccessTestCase, strict boo checks = getChecks(t, mockBinOp, testCase.IncompleteCheck, testCase.Authorized) } + counter.On("WithLabelValues", []string{reasonLabel, invalidWRPDest, outcomeLabel, rejected}).Return().Once() + counter.On("Add", 1.).Return().Once() deviceAccessAuthority := &talariaDeviceAccess{ strict: strict, - wrpMessagesCounter: counter, + wrpMessagesCounter: &counter, deviceRegistry: mockDeviceRegistry, sep: ">", logger: testLogger, @@ -184,28 +183,6 @@ func getLabelMaps(err error, isFatal, strict bool, baseLabelPairs map[string]str return out } -type testCounter struct { - count float64 - labelPairs map[string]string -} - -func (c *testCounter) Add(delta float64) { - c.count += delta -} - -func (c *testCounter) With(labelValues ...string) metrics.Counter { - for i := 0; i < len(labelValues)-1; i += 2 { - c.labelPairs[labelValues[i]] = labelValues[i+1] - } - return c -} - -func newTestCounter() *testCounter { - return &testCounter{ - labelPairs: make(map[string]string), - } -} - func getTestDeviceMetadata() *device.Metadata { metadata := new(device.Metadata) claims := map[string]interface{}{ diff --git a/eventDispatcher.go b/eventDispatcher.go index c5656c0..51f68ff 100644 --- a/eventDispatcher.go +++ b/eventDispatcher.go @@ -15,7 +15,6 @@ import ( "go.uber.org/zap" - "github.com/go-kit/kit/metrics" "github.com/prometheus/client_golang/prometheus" "github.com/xmidt-org/webpa-common/v2/device" "github.com/xmidt-org/webpa-common/v2/event" @@ -43,7 +42,7 @@ type eventDispatcher struct { authorizationKey string source string eventMap event.MultiMap - queueSize metrics.Gauge + queueSize prometheus.Gauge droppedMessages CounterVec outboundEvents CounterVec outbounds chan<- outboundEnvelope diff --git a/eventDispatcher_test.go b/eventDispatcher_test.go index ba1aa21..b12be6a 100644 --- a/eventDispatcher_test.go +++ b/eventDispatcher_test.go @@ -34,12 +34,14 @@ func genTestMetadata() *device.Metadata { func testEventDispatcherOnDeviceEventConnectEvent(t *testing.T) { var ( - assert = assert.New(t) - require = require.New(t) - d = new(device.MockDevice) - dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), nil, nil) + assert = assert.New(t) + require = require.New(t) + d = new(device.MockDevice) ) + om, err := NewTestOutboundMeasures() + require.NoError(err) + dispatcher, outbounds, err := NewEventDispatcher(om, nil, nil) require.NotNil(dispatcher) require.NotNil(outbounds) require.NoError(err) @@ -57,12 +59,14 @@ func testEventDispatcherOnDeviceEventConnectEvent(t *testing.T) { func testEventDispatcherOnDeviceEventDisconnectEvent(t *testing.T) { var ( - assert = assert.New(t) - require = require.New(t) - d = new(device.MockDevice) - dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), nil, nil) + assert = assert.New(t) + require = require.New(t) + d = new(device.MockDevice) ) + om, err := NewTestOutboundMeasures() + require.NoError(err) + dispatcher, outbounds, err := NewEventDispatcher(om, nil, nil) require.NotNil(dispatcher) require.NotNil(outbounds) require.NoError(err) @@ -83,11 +87,13 @@ func testEventDispatcherOnDeviceEventDisconnectEvent(t *testing.T) { func testEventDispatcherOnDeviceEventUnroutable(t *testing.T) { var ( - assert = assert.New(t) - require = require.New(t) - dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), nil, nil) + assert = assert.New(t) + require = require.New(t) ) + om, err := NewTestOutboundMeasures() + require.NoError(err) + dispatcher, outbounds, err := NewEventDispatcher(om, nil, nil) require.NotNil(dispatcher) require.NotNil(outbounds) require.NoError(err) @@ -102,10 +108,13 @@ func testEventDispatcherOnDeviceEventUnroutable(t *testing.T) { func testEventDispatcherOnDeviceEventBadURLFilter(t *testing.T) { var ( - assert = assert.New(t) - dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &Outbounder{DefaultScheme: "bad"}, nil) + assert = assert.New(t) + require = require.New(t) ) + om, err := NewTestOutboundMeasures() + require.NoError(err) + dispatcher, outbounds, err := NewEventDispatcher(om, &Outbounder{DefaultScheme: "bad"}, nil) assert.Nil(dispatcher) assert.Nil(outbounds) assert.Error(err) @@ -188,11 +197,13 @@ func testEventDispatcherOnDeviceEventDispatchEvent(t *testing.T) { t.Logf("%#v, method=%s, format=%s", record, record.outbounder.method(), format) var ( - expectedContents = []byte{1, 2, 3, 4} - urlFilter = new(mockURLFilter) - dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), record.outbounder, urlFilter) + expectedContents = []byte{1, 2, 3, 4} + urlFilter = new(mockURLFilter) ) + om, err := NewTestOutboundMeasures() + require.NoError(err) + dispatcher, outbounds, err := NewEventDispatcher(om, record.outbounder, urlFilter) require.NotNil(dispatcher) require.NotNil(outbounds) require.NoError(err) @@ -250,8 +261,10 @@ func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) { }), zapcore.AddSync(&b), zapcore.ErrorLevel), ), } - om = NewTestOutboundMeasures() ) + + om, err := NewTestOutboundMeasures() + require.NoError(err) dm := new(mockCounter) om.DroppedMessages = dm d, _, err := NewEventDispatcher(om, outbounder, nil) @@ -288,9 +301,11 @@ func testEventDispatcherOnDeviceEventMessageReceived(t *testing.T) { }), zapcore.AddSync(&b), zapcore.ErrorLevel), ), } - dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &o, nil) ) + om, err := NewTestOutboundMeasures() + require.NoError(err) + dispatcher, outbounds, err := NewEventDispatcher(om, &o, nil) require.NotNil(dispatcher) require.NotNil(outbounds) require.NoError(err) @@ -329,9 +344,11 @@ func testEventDispatcherOnDeviceEventFilterError(t *testing.T) { }), zapcore.AddSync(&b), zapcore.ErrorLevel), ), } - dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &o, urlFilter) ) + om, err := NewTestOutboundMeasures() + require.NoError(err) + dispatcher, outbounds, err := NewEventDispatcher(om, &o, urlFilter) require.NotNil(dispatcher) require.NotNil(outbounds) require.NoError(err) @@ -411,11 +428,13 @@ func testEventDispatcherOnDeviceEventDispatchTo(t *testing.T) { t.Logf("%#v, method=%s, format=%s", record, record.outbounder.method(), format) var ( - expectedContents = []byte{4, 7, 8, 1} - urlFilter = new(mockURLFilter) - dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), record.outbounder, urlFilter) + expectedContents = []byte{4, 7, 8, 1} + urlFilter = new(mockURLFilter) ) + om, err := NewTestOutboundMeasures() + require.NoError(err) + dispatcher, outbounds, err := NewEventDispatcher(om, record.outbounder, urlFilter) require.NotNil(dispatcher) require.NotNil(outbounds) require.NoError(err) @@ -468,7 +487,9 @@ func testEventDispatcherOnDeviceEventNilEventError(t *testing.T) { }), zapcore.AddSync(&b), zapcore.ErrorLevel), ) o.Logger = logger - dp, _, err := NewEventDispatcher(NewTestOutboundMeasures(), o, nil) + om, err := NewTestOutboundMeasures() + require.NoError(err) + dp, _, err := NewEventDispatcher(om, o, nil) require.NotNil(dp) require.NoError(err) // Purge init logs @@ -481,8 +502,11 @@ func testEventDispatcherOnDeviceEventNilEventError(t *testing.T) { func testEventDispatcherOnDeviceEventEventMapError(t *testing.T) { assert := assert.New(t) + require := require.New(t) o := &Outbounder{EventEndpoints: map[string]interface{}{"bad": -17.6}} - dp, _, err := NewEventDispatcher(NewTestOutboundMeasures(), o, nil) + om, err := NewTestOutboundMeasures() + require.NoError(err) + dp, _, err := NewEventDispatcher(om, o, nil) assert.Nil(dp) assert.Error(err) } diff --git a/go.mod b/go.mod index 265e62f..2275ae6 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/xmidt-org/talaria -go 1.21 +go 1.23 require ( github.com/fatih/structs v1.1.0 @@ -9,7 +9,7 @@ require ( github.com/goph/emperror v0.17.3-0.20190703203600-60a8d9faa17b github.com/gorilla/mux v1.8.1 github.com/justinas/alice v1.2.0 - github.com/prometheus/client_golang v1.19.1 + github.com/prometheus/client_golang v1.20.3 github.com/prometheus/client_model v0.6.1 github.com/segmentio/ksuid v1.0.4 github.com/spf13/cast v1.7.0 @@ -20,6 +20,7 @@ require ( github.com/xmidt-org/bascule v0.11.6 github.com/xmidt-org/candlelight v0.1.15 github.com/xmidt-org/clortho v0.0.4 + github.com/xmidt-org/httpaux v0.4.0 github.com/xmidt-org/sallust v0.2.2 github.com/xmidt-org/touchstone v0.1.5 github.com/xmidt-org/webpa-common/v2 v2.3.2 @@ -66,6 +67,7 @@ require ( github.com/hashicorp/serf v0.10.1 // indirect github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c // indirect github.com/jtacoma/uritemplates v1.0.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect github.com/lestrrat-go/httprc v1.0.5 // indirect @@ -77,12 +79,13 @@ require ( github.com/mattn/go-isatty v0.0.17 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/openzipkin/zipkin-go v0.4.3 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/common v0.48.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/segmentio/asm v1.2.0 // indirect @@ -95,7 +98,6 @@ require ( github.com/xmidt-org/argus v0.9.10 // indirect github.com/xmidt-org/arrange v0.4.0 // indirect github.com/xmidt-org/chronon v0.1.1 // indirect - github.com/xmidt-org/httpaux v0.4.0 // indirect go.opentelemetry.io/otel v1.29.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0 // indirect diff --git a/go.sum b/go.sum index 27fa1de..4c8a9a6 100644 --- a/go.sum +++ b/go.sum @@ -368,6 +368,8 @@ github.com/justinas/alice v1.2.0/go.mod h1:fN5HRH/reO/zrUflLfTN43t3vXvKzvZIENsNE github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -381,6 +383,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lestrrat-go/blackmagic v1.0.2 h1:Cg2gVSc9h7sz9NOByczrbUvLopQmXrfFx//N+AkAr5k= github.com/lestrrat-go/blackmagic v1.0.2/go.mod h1:UrEqBzIR2U6CnzVyUtfM6oZNMt/7O7Vohk2J0OGSAtU= github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE= @@ -431,6 +435,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -460,8 +466,8 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= -github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= -github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_golang v1.20.3 h1:oPksm4K8B+Vt35tUhw6GbSNSgVlVSBH0qELP/7u83l4= +github.com/prometheus/client_golang v1.20.3/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -472,15 +478,15 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= -github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= -github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= -github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= diff --git a/main.go b/main.go index ccb8f5e..d8d2582 100644 --- a/main.go +++ b/main.go @@ -14,12 +14,17 @@ import ( "syscall" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "github.com/spf13/pflag" "github.com/spf13/viper" "github.com/xmidt-org/bascule/basculehelper" "github.com/xmidt-org/candlelight" + "github.com/xmidt-org/touchstone" + + // nolint:staticcheck + "github.com/xmidt-org/webpa-common/v2/xmetrics" "github.com/xmidt-org/webpa-common/v2/adapter" // nolint:staticcheck @@ -37,8 +42,6 @@ import ( "github.com/xmidt-org/webpa-common/v2/service/monitor" // nolint:staticcheck "github.com/xmidt-org/webpa-common/v2/service/servicecfg" - // nolint:staticcheck - "github.com/xmidt-org/webpa-common/v2/xmetrics" "github.com/xmidt-org/webpa-common/v2/xresolver/consul" "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" ) @@ -59,7 +62,7 @@ func setupDefaultConfigValues(v *viper.Viper) { v.SetDefault(RehasherServicesConfigKey, []string{applicationName}) } -func newDeviceManager(logger *zap.Logger, r xmetrics.Registry, v *viper.Viper) (device.Manager, devicegate.Interface, *consul.ConsulWatcher, error) { +func newDeviceManager(logger *zap.Logger, r xmetrics.Registry, tf *touchstone.Factory, v *viper.Viper) (device.Manager, devicegate.Interface, *consul.ConsulWatcher, error) { deviceOptions, err := device.NewOptions(logger, v.Sub(device.DeviceManagerKey)) if err != nil { return nil, nil, nil, err @@ -70,7 +73,12 @@ func newDeviceManager(logger *zap.Logger, r xmetrics.Registry, v *viper.Viper) ( return nil, nil, nil, err } - outboundListeners, err := outbounder.Start(NewOutboundMeasures(r)) + om, err := NewOutboundMeasures(tf) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to get OutboundMeasures: %s", err) + } + + outboundListeners, err := outbounder.Start(om) if err != nil { return nil, nil, nil, err } @@ -113,7 +121,7 @@ func talaria(arguments []string) int { f = pflag.NewFlagSet(applicationName, pflag.ContinueOnError) v = viper.New() - logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, Metrics, device.Metrics, rehasher.Metrics, service.Metrics, basculehelper.AuthValidationMetrics) + logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, device.Metrics, rehasher.Metrics, service.Metrics, basculehelper.AuthValidationMetrics) ) if parseErr, done := printVersion(f, arguments); done { @@ -140,7 +148,19 @@ func talaria(arguments []string) int { } logger.Info("tracing status", zap.Bool("enabled", !tracing.IsNoop())) - manager, filterGate, watcher, err := newDeviceManager(logger, metricsRegistry, v) + promReg, ok := metricsRegistry.(prometheus.Registerer) + if !ok { + fmt.Fprintf(os.Stderr, "failed to get prometheus registerer") + + return 1 + } + + var tsConfig touchstone.Config + // Get touchstone & zap configurations + v.UnmarshalKey("touchstone", &tsConfig) + tf := touchstone.NewFactory(tsConfig, logger, promReg) + + manager, filterGate, watcher, err := newDeviceManager(logger, metricsRegistry, tf, v) if err != nil { logger.Error("unable to create device manager", zap.Error(err)) return 2 @@ -154,7 +174,7 @@ func talaria(arguments []string) int { return 4 } - controlConstructor, err := StartControlServer(logger, manager, filterGate, metricsRegistry, v, tracing) + controlConstructor, err := StartControlServer(logger, manager, filterGate, tf, v, tracing) if err != nil { logger.Error("unable to create control server", zap.Error(err)) return 3 @@ -174,7 +194,7 @@ func talaria(arguments []string) int { } rootRouter.Use(otelmux.Middleware("primary", otelMuxOptions...), candlelight.EchoFirstTraceNodeInfo(tracing.Propagator(), true)) - primaryHandler, err := NewPrimaryHandler(logger, manager, v, a, e, controlConstructor, metricsRegistry, rootRouter) + primaryHandler, err := NewPrimaryHandler(logger, manager, v, a, e, controlConstructor, tf, rootRouter) if err != nil { logger.Error("unable to start device management", zap.Error(err)) return 4 diff --git a/metrics.go b/metrics.go index cc23170..2621535 100644 --- a/metrics.go +++ b/metrics.go @@ -3,17 +3,15 @@ package main import ( + "errors" "net/http" "strconv" "time" - "github.com/go-kit/kit/metrics" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/xmidt-org/webpa-common/v2/xhttp" - - // nolint:staticcheck - "github.com/xmidt-org/webpa-common/v2/xmetrics" + "github.com/xmidt-org/httpaux/retry" + "github.com/xmidt-org/touchstone" ) // Metric names @@ -99,118 +97,34 @@ const ( failureOutcome = "failure" ) -func Metrics() []xmetrics.Metric { - return []xmetrics.Metric{ - { - Name: OutboundInFlightGauge, - Type: xmetrics.GaugeType, - Help: "The number of active, in-flight requests from devices", - }, - { - Name: OutboundRequestDuration, - Type: "histogram", - Help: "The durations of outbound requests from devices", - LabelNames: []string{eventLabel, codeLabel, reasonLabel, urlLabel}, - Buckets: []float64{.25, .5, 1, 2.5, 5, 10}, - }, - { - Name: OutboundRequestCounter, - Type: xmetrics.CounterType, - Help: "The count of outbound requests", - LabelNames: []string{eventLabel, codeLabel, reasonLabel, urlLabel}, - }, - { - Name: OutboundRequestSizeBytes, - Type: xmetrics.HistogramType, - Help: "A histogram of request sizes for outbound requests", - LabelNames: []string{eventLabel, codeLabel}, - Buckets: []float64{200, 500, 900, 1500, 3000, 6000, 12000, 24000, 48000, 96000, 192000}, - }, - { - Name: TotalOutboundEvents, - Type: xmetrics.CounterType, - Help: "Total count of outbound events", - LabelNames: []string{eventLabel, reasonLabel, urlLabel, outcomeLabel}, - }, - { - Name: OutboundQueueSize, - Type: xmetrics.GaugeType, - Help: "The current number of requests waiting to be sent outbound", - }, - { - Name: OutboundDroppedMessageCounter, - Type: xmetrics.CounterType, - Help: "The total count of messages dropped", - LabelNames: []string{eventLabel, codeLabel, reasonLabel, urlLabel}, - }, - { - Name: OutboundRetries, - Type: xmetrics.CounterType, - Help: "The total count of outbound HTTP retries", - }, - { - Name: OutboundAckSuccessCounter, - Type: xmetrics.CounterType, - Help: "Number of outbound WRP acks", - LabelNames: []string{qosLevelLabel, partnerIDLabel, messageType}, - }, - { - Name: OutboundAckFailureCounter, - Type: xmetrics.CounterType, - Help: "Number of outbound WRP ack failures", - LabelNames: []string{qosLevelLabel, partnerIDLabel, messageType}, - }, - { - Name: OutboundAckSuccessLatencyHistogram, - Type: xmetrics.HistogramType, - Help: "A histogram of latencies for successful outbound WRP acks", - LabelNames: []string{qosLevelLabel, partnerIDLabel, messageType}, - Buckets: []float64{0.0625, 0.125, .25, .5, 1, 5, 10, 20, 40, 80, 160}, - }, - { - Name: OutboundAckFailureLatencyHistogram, - Type: xmetrics.HistogramType, - Help: "A histogram of latencies for failed outbound WRP acks", - LabelNames: []string{qosLevelLabel, partnerIDLabel, messageType}, - Buckets: []float64{0.0625, 0.125, .25, .5, 1, 5, 10, 20, 40, 80, 160}, - }, - { - Name: GateStatus, - Type: xmetrics.GaugeType, - Help: "Indicates whether the device gate is open (1.0) or closed (0.0)", - }, - { - Name: DrainStatus, - Type: xmetrics.GaugeType, - Help: "Indicates whether a device drain operation is currently running", - }, - { - Name: DrainCounter, - Type: xmetrics.CounterType, - Help: "The total count of devices disconnected due to a drain since the server started", - }, - { - Name: InboundWRPMessageCounter, - Type: xmetrics.CounterType, - Help: "Number of inbound WRP Messages successfully decoded and ready to route to device", - LabelNames: []string{outcomeLabel, reasonLabel}, - }, - } -} - type HistogramVec interface { prometheus.Collector - With(prometheus.Labels) prometheus.Observer - CurryWith(prometheus.Labels) (prometheus.ObserverVec, error) - GetMetricWith(prometheus.Labels) (prometheus.Observer, error) - GetMetricWithLabelValues(...string) (prometheus.Observer, error) + With(labels prometheus.Labels) prometheus.Observer + CurryWith(labels prometheus.Labels) (prometheus.ObserverVec, error) + GetMetricWith(labels prometheus.Labels) (prometheus.Observer, error) + GetMetricWithLabelValues(lvs ...string) (prometheus.Observer, error) MustCurryWith(labels prometheus.Labels) (o prometheus.ObserverVec) WithLabelValues(lvs ...string) (o prometheus.Observer) } type CounterVec interface { prometheus.Collector - With(prometheus.Labels) prometheus.Counter + CurryWith(labels prometheus.Labels) (*prometheus.CounterVec, error) + GetMetricWith(labels prometheus.Labels) (prometheus.Counter, error) + GetMetricWithLabelValues(lvs ...string) (prometheus.Counter, error) + MustCurryWith(labels prometheus.Labels) *prometheus.CounterVec + With(labels prometheus.Labels) prometheus.Counter + WithLabelValues(lvs ...string) prometheus.Counter +} + +type GaugeVec interface { + prometheus.Collector + CurryWith(labels prometheus.Labels) (*prometheus.GaugeVec, error) + GetMetricWith(labels prometheus.Labels) (prometheus.Gauge, error) + GetMetricWithLabelValues(lvs ...string) (prometheus.Gauge, error) + MustCurryWith(labels prometheus.Labels) *prometheus.GaugeVec + With(labels prometheus.Labels) prometheus.Gauge + WithLabelValues(lvs ...string) prometheus.Gauge } type OutboundMeasures struct { @@ -219,8 +133,8 @@ type OutboundMeasures struct { RequestCounter CounterVec RequestSize HistogramVec OutboundEvents CounterVec - QueueSize metrics.Gauge - Retries metrics.Counter + QueueSize prometheus.Gauge + Retries prometheus.Counter DroppedMessages CounterVec AckSuccess CounterVec AckFailure CounterVec @@ -228,24 +142,162 @@ type OutboundMeasures struct { AckFailureLatency HistogramVec } -func NewOutboundMeasures(r xmetrics.Registry) OutboundMeasures { - return OutboundMeasures{ - InFlight: r.NewGaugeVec(OutboundInFlightGauge).WithLabelValues(), - RequestDuration: r.NewHistogramVec(OutboundRequestDuration), - RequestCounter: r.NewCounterVec(OutboundRequestCounter), - RequestSize: r.NewHistogramVec(OutboundRequestSizeBytes), - OutboundEvents: r.NewCounterVec(TotalOutboundEvents), - QueueSize: r.NewGauge(OutboundQueueSize), - Retries: r.NewCounter(OutboundRetries), - DroppedMessages: r.NewCounterVec(OutboundDroppedMessageCounter), - AckSuccess: r.NewCounterVec(OutboundAckSuccessCounter), - AckFailure: r.NewCounterVec(OutboundAckFailureCounter), - // 0 is for the unused `buckets` argument in xmetrics.Registry.NewHistogram - AckSuccessLatency: r.NewHistogramVec(OutboundAckSuccessLatencyHistogram), - // 0 is for the unused `buckets` argument in xmetrics.Registry.NewHistogram - AckFailureLatency: r.NewHistogramVec(OutboundAckFailureLatencyHistogram), - } +func NewOutboundMeasures(tf *touchstone.Factory) (om OutboundMeasures, errs error) { + var err error + + om.InFlight, err = tf.NewGauge(prometheus.GaugeOpts{ + Name: OutboundInFlightGauge, + Help: "The number of active, in-flight requests from devices", + }) + errs = errors.Join(errs, err) + + om.RequestDuration, err = tf.NewHistogramVec( + prometheus.HistogramOpts{ + Name: OutboundRequestDuration, + Help: "The durations of outbound requests from devices", + // Each bucket is at most 10% wider than the previous one), + // which will result in each power of two divided into 8 buckets. + // (e.g. there will be 8 buckets between 1 + // and 2, same as between 2 and 4, and 4 and 8, etc.). + NativeHistogramBucketFactor: 1.1, + NativeHistogramZeroThreshold: 0.25, + NativeHistogramMaxBucketNumber: 10, + NativeHistogramMinResetDuration: time.Hour * 24 * 7, + NativeHistogramMaxZeroThreshold: 0.5, + // Disable exemplars. + NativeHistogramMaxExemplars: -1, + NativeHistogramExemplarTTL: time.Minute * 5, + }, + []string{eventLabel, codeLabel, reasonLabel, urlLabel}..., + ) + errs = errors.Join(errs, err) + + om.RequestCounter, err = tf.NewCounterVec( + prometheus.CounterOpts{ + Name: OutboundRequestCounter, + Help: "The count of outbound requests", + }, + []string{eventLabel, codeLabel, reasonLabel, urlLabel}..., + ) + errs = errors.Join(errs, err) + + om.RequestSize, err = tf.NewHistogramVec( + prometheus.HistogramOpts{ + Name: OutboundRequestSizeBytes, + Help: "A histogram of request sizes for outbound requests", + // Each bucket is at most 10% wider than the previous one), + // which will result in each power of two divided into 8 buckets. + // (e.g. there will be 8 buckets between 1 + // and 2, same as between 2 and 4, and 4 and 8, etc.). + NativeHistogramBucketFactor: 1.1, + NativeHistogramZeroThreshold: 200, + NativeHistogramMaxBucketNumber: 10, + NativeHistogramMinResetDuration: time.Hour * 24 * 7, + NativeHistogramMaxZeroThreshold: 1500, + // Disable exemplars. + NativeHistogramMaxExemplars: -1, + NativeHistogramExemplarTTL: time.Minute * 5, + }, + []string{eventLabel, codeLabel}..., + ) + errs = errors.Join(errs, err) + + om.OutboundEvents, err = tf.NewCounterVec( + prometheus.CounterOpts{ + Name: TotalOutboundEvents, + Help: "Total count of outbound events", + }, + []string{eventLabel, reasonLabel, urlLabel, outcomeLabel}..., + ) + errs = errors.Join(errs, err) + + om.QueueSize, err = tf.NewGauge( + prometheus.GaugeOpts{ + Name: OutboundQueueSize, + Help: "The current number of requests waiting to be sent outbound", + }, + ) + errs = errors.Join(errs, err) + + om.Retries, err = tf.NewCounter( + prometheus.CounterOpts{ + Name: OutboundRetries, + Help: "The total count of outbound HTTP retries", + }, + ) + errs = errors.Join(errs, err) + + om.DroppedMessages, err = tf.NewCounterVec( + prometheus.CounterOpts{ + Name: OutboundDroppedMessageCounter, + Help: "The total count of messages dropped", + }, + []string{eventLabel, codeLabel, reasonLabel, urlLabel}..., + ) + errs = errors.Join(errs, err) + + om.AckSuccess, err = tf.NewCounterVec( + prometheus.CounterOpts{ + Name: OutboundAckSuccessCounter, + Help: "Number of outbound WRP acks", + }, + []string{qosLevelLabel, partnerIDLabel, messageType}..., + ) + errs = errors.Join(errs, err) + + om.AckFailure, err = tf.NewCounterVec( + prometheus.CounterOpts{ + Name: OutboundAckFailureCounter, + Help: "Number of outbound WRP ack failures", + }, + []string{qosLevelLabel, partnerIDLabel, messageType}..., + ) + errs = errors.Join(errs, err) + + om.AckSuccessLatency, err = tf.NewHistogramVec( + prometheus.HistogramOpts{ + Name: OutboundAckSuccessLatencyHistogram, + Help: "A histogram of latencies for successful outbound WRP acks", + // Each bucket is at most 10% wider than the previous one), + // which will result in each power of two divided into 8 buckets. + // (e.g. there will be 8 buckets between 1 + // and 2, same as between 2 and 4, and 4 and 8, etc.). + NativeHistogramBucketFactor: 1.1, + NativeHistogramZeroThreshold: 0.0625, + NativeHistogramMaxBucketNumber: 11, + NativeHistogramMinResetDuration: time.Hour * 24 * 7, + NativeHistogramMaxZeroThreshold: 0.125, + // Disable exemplars. + NativeHistogramMaxExemplars: -1, + NativeHistogramExemplarTTL: time.Minute * 5, + }, + []string{qosLevelLabel, partnerIDLabel, messageType}..., + ) + errs = errors.Join(errs, err) + + om.AckFailureLatency, err = tf.NewHistogramVec( + prometheus.HistogramOpts{ + Name: OutboundAckFailureLatencyHistogram, + Help: "A histogram of latencies for failed outbound WRP acks", + // Each bucket is at most 10% wider than the previous one), + // which will result in each power of two divided into 8 buckets. + // (e.g. there will be 8 buckets between 1 + // and 2, same as between 2 and 4, and 4 and 8, etc.). + NativeHistogramBucketFactor: 1.1, + NativeHistogramZeroThreshold: 0.0625, + NativeHistogramMaxBucketNumber: 11, + NativeHistogramMinResetDuration: time.Hour * 24 * 7, + NativeHistogramMaxZeroThreshold: 0.125, + // Disable exemplars. + NativeHistogramMaxExemplars: -1, + NativeHistogramExemplarTTL: time.Minute * 5, + }, + []string{qosLevelLabel, partnerIDLabel, messageType}..., + ) + + return om, errors.Join(errs, err) } + func InstrumentOutboundSize(obs HistogramVec, next http.RoundTripper) promhttp.RoundTripperFunc { return promhttp.RoundTripperFunc(func(request *http.Request) (*http.Response, error) { eventType, ok := request.Context().Value(eventTypeContextKey{}).(string) @@ -273,6 +325,7 @@ func InstrumentOutboundSize(obs HistogramVec, next http.RoundTripper) promhttp.R return response, err }) } + func InstrumentOutboundDuration(obs HistogramVec, next http.RoundTripper) promhttp.RoundTripperFunc { return promhttp.RoundTripperFunc(func(request *http.Request) (*http.Response, error) { eventType, ok := request.Context().Value(eventTypeContextKey{}).(string) @@ -340,24 +393,33 @@ func InstrumentOutboundCounter(counter CounterVec, next http.RoundTripper) promh func NewOutboundRoundTripper(om OutboundMeasures, o *Outbounder) http.RoundTripper { // TODO add tests for NewOutboundRoundTripper // nolint:bodyclose - return promhttp.RoundTripperFunc(xhttp.RetryTransactor( - // use the default should retry predicate ... - xhttp.RetryOptions{ - Logger: o.logger(), - Retries: o.retries(), - Counter: om.Retries, - }, - InstrumentOutboundCounter( - om.RequestCounter, - InstrumentOutboundSize( - om.RequestSize, - InstrumentOutboundDuration( - om.RequestDuration, - promhttp.InstrumentRoundTripperInFlight(om.InFlight, o.transport()), + return promhttp.RoundTripperFunc( + retry.New( + retry.Config{ + Retries: o.retries(), + Check: func(resp *http.Response, err error) bool { + shouldRetry := retry.DefaultCheck(resp, err) + if shouldRetry { + om.Retries.Add(1) + } + + return shouldRetry + }, + }, + &http.Client{ + Transport: InstrumentOutboundCounter( + om.RequestCounter, + InstrumentOutboundSize( + om.RequestSize, + InstrumentOutboundDuration( + om.RequestDuration, + promhttp.InstrumentRoundTripperInFlight(om.InFlight, o.transport()), + ), + ), ), - ), - ), - )) + }, + ).Do, + ) } func computeApproximateRequestSize(r *http.Request) int { diff --git a/metrics_test.go b/metrics_test.go index 7b65508..8e360ad 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -3,11 +3,23 @@ package main import ( - // nolint:staticcheck - "github.com/xmidt-org/webpa-common/v2/xmetrics" + "github.com/xmidt-org/sallust" + "github.com/xmidt-org/touchstone" ) +// nolint:staticcheck + // NewTestOutboundMeasures creates an OutboundMeasures appropriate for a testing environment -func NewTestOutboundMeasures() OutboundMeasures { - return NewOutboundMeasures(xmetrics.MustNewRegistry(nil, Metrics)) +func NewTestOutboundMeasures() (OutboundMeasures, error) { + cfg := touchstone.Config{ + DefaultNamespace: "n", + DefaultSubsystem: "s", + } + _, pr, err := touchstone.New(cfg) + if err != nil { + return OutboundMeasures{}, err + } + + tf := touchstone.NewFactory(cfg, sallust.Default(), pr) + return NewOutboundMeasures(tf) } diff --git a/mocks_test.go b/mocks_test.go index d40fd28..d411a32 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -112,19 +112,23 @@ func (m *mockHistogram) With(labelValues prometheus.Labels) prometheus.Observer } func (m *mockHistogram) CurryWith(labels prometheus.Labels) (o prometheus.ObserverVec, err error) { - return o, err + return m, nil } + func (m *mockHistogram) GetMetricWith(labels prometheus.Labels) (o prometheus.Observer, err error) { - return o, err + return m, nil } + func (m *mockHistogram) GetMetricWithLabelValues(...string) (o prometheus.Observer, err error) { - return o, err + return m, nil } + func (m *mockHistogram) MustCurryWith(labels prometheus.Labels) (o prometheus.ObserverVec) { - return o + return m } + func (m *mockHistogram) WithLabelValues(lvs ...string) (o prometheus.Observer) { - return o + return m } // mockCounter provides the mock implementation of the metrics.Counter object @@ -132,13 +136,19 @@ type mockCounter struct { mockCollector mockMetric mock.Mock + + // port over testCounter functionality + count float64 + labelPairs map[string]string } func (m *mockCounter) Add(delta float64) { + m.count += delta m.Called(delta) } func (m *mockCounter) Inc() {} + func (m *mockCounter) With(labelValues prometheus.Labels) prometheus.Counter { for k, v := range labelValues { if !utf8.ValidString(k) { @@ -147,10 +157,36 @@ func (m *mockCounter) With(labelValues prometheus.Labels) prometheus.Counter { panic(fmt.Sprintf("key `%s`, value `%s`: value is not UTF-8", k, v)) } } + m.Called(labelValues) return m } +func (m *mockCounter) CurryWith(labels prometheus.Labels) (c *prometheus.CounterVec, err error) { + return &prometheus.CounterVec{}, nil +} + +func (m *mockCounter) GetMetricWith(labels prometheus.Labels) (c prometheus.Counter, err error) { + return m, nil +} + +func (m *mockCounter) GetMetricWithLabelValues(lvs ...string) (c prometheus.Counter, err error) { + return m, nil +} + +func (m *mockCounter) MustCurryWith(labels prometheus.Labels) (c *prometheus.CounterVec) { + return &prometheus.CounterVec{} +} + +func (m *mockCounter) WithLabelValues(lvs ...string) (c prometheus.Counter) { + // port over testCounter functionality + for i := 0; i < len(lvs)-1; i += 2 { + m.labelPairs[lvs[i]] = lvs[i+1] + } + + return m +} + // mockKey is a mock for key. type mockKey struct { mock.Mock diff --git a/outbounder_test.go b/outbounder_test.go index cea5c40..01ef03c 100644 --- a/outbounder_test.go +++ b/outbounder_test.go @@ -15,14 +15,13 @@ import ( "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/xmidt-org/sallust" + "github.com/xmidt-org/touchstone" "github.com/xmidt-org/webpa-common/v2/adapter" "github.com/xmidt-org/webpa-common/v2/device" "github.com/xmidt-org/webpa-common/v2/event" - "go.uber.org/zap/zaptest" - - // nolint:staticcheck - "github.com/xmidt-org/webpa-common/v2/xmetrics" "github.com/xmidt-org/wrp-go/v3" + "go.uber.org/zap/zaptest" ) func ExampleOutbounder() { @@ -40,12 +39,6 @@ func ExampleOutbounder() { defer server.Close() - metricsRegistry, err := xmetrics.NewRegistry(nil, Metrics) - if err != nil { - fmt.Println(err) - return - } - var ( // set the workerPoolSize to 1 so that output order is deterministic configuration = []byte(fmt.Sprintf( @@ -73,7 +66,24 @@ func ExampleOutbounder() { return } - listeners, err := o.Start(NewOutboundMeasures(metricsRegistry)) + cfg := touchstone.Config{ + DefaultNamespace: "n", + DefaultSubsystem: "s", + } + _, pr, err := touchstone.New(cfg) + if err != nil { + fmt.Println(err) + return + } + + tf := touchstone.NewFactory(cfg, sallust.Default(), pr) + om, err := NewOutboundMeasures(tf) + if err != nil { + fmt.Println(err) + return + } + + listeners, err := o.Start(om) if err != nil { fmt.Println(err) return diff --git a/primaryHandler.go b/primaryHandler.go index 3b6ed22..57434f0 100644 --- a/primaryHandler.go +++ b/primaryHandler.go @@ -10,20 +10,15 @@ import ( "net/http" "time" - "github.com/go-kit/kit/metrics" "github.com/gorilla/mux" "github.com/justinas/alice" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/viper" "github.com/xmidt-org/bascule" - "github.com/xmidt-org/bascule/basculehelper" "github.com/xmidt-org/clortho/clorthometrics" "github.com/xmidt-org/clortho/clorthozap" "github.com/xmidt-org/sallust" "github.com/xmidt-org/touchstone" - - // nolint:staticcheck - "github.com/xmidt-org/webpa-common/v2/xmetrics" "go.uber.org/zap" // nolint:staticcheck @@ -34,7 +29,6 @@ import ( // nolint:staticcheck "github.com/xmidt-org/webpa-common/v2/device" - // nolint:staticcheck "github.com/xmidt-org/webpa-common/v2/service" // nolint:staticcheck @@ -128,7 +122,7 @@ func buildUserPassMap(logger *zap.Logger, encodedBasicAuthKeys []string) (userPa } func NewPrimaryHandler(logger *zap.Logger, manager device.Manager, v *viper.Viper, a service.Accessor, e service.Environment, - controlConstructor alice.Constructor, metricsRegistry xmetrics.Registry, r *mux.Router) (http.Handler, error) { + controlConstructor alice.Constructor, tf *touchstone.Factory, r *mux.Router) (http.Handler, error) { var ( inboundTimeout = getInboundTimeout(v) apiHandler = r.PathPrefix(fmt.Sprintf("%s/{version:%s|%s}", baseURI, v2, version)).Subrouter() @@ -139,10 +133,24 @@ func NewPrimaryHandler(logger *zap.Logger, manager device.Manager, v *viper.Vipe deviceAuthRules = bascule.Validators{} //auth rules for device registration endpoints serviceAuthRules = bascule.Validators{} //auth rules for everything else - m = basculehelper.NewAuthValidationMeasures(metricsRegistry) - listener = basculehelper.NewMetricListener(m) ) + authCounter, err := tf.NewCounterVec( + prometheus.CounterOpts{ + Name: basculehttp.AuthValidationOutcome, + Help: "Counter for success and failure reason results through bascule", + }, basculehttp.ServerLabel, basculehttp.OutcomeLabel) + if err != nil { + return nil, err + } + + listener, err := basculehttp.NewMetricListener( + &basculehttp.AuthValidationMeasures{ValidationOutcome: authCounter}, + ) + if err != nil { + return nil, err + } + authConstructorOptions := []basculehttp.COption{ basculehttp.WithCLogger(getLogger), basculehttp.WithCErrorResponseFunc(listener.OnErrorResponse), @@ -169,21 +177,9 @@ func NewPrimaryHandler(logger *zap.Logger, manager device.Manager, v *viper.Vipe return nil, errors.New("failed to create clortho reolver") } - promReg, ok := metricsRegistry.(prometheus.Registerer) - if !ok { - return nil, errors.New("failed to get prometheus registerer") - - } - - var ( - tsConfig touchstone.Config - zConfig sallust.Config - ) - // Get touchstone & zap configurations - v.UnmarshalKey("touchstone", &tsConfig) + var zConfig sallust.Config v.UnmarshalKey("zap", &zConfig) zlogger := zap.Must(zConfig.Build()) - tf := touchstone.NewFactory(tsConfig, zlogger, promReg) // Instantiate a metric listener for the resolver cml, err := clorthometrics.NewListener(clorthometrics.WithFactory(tf)) if err != nil { @@ -239,7 +235,19 @@ func NewPrimaryHandler(logger *zap.Logger, manager device.Manager, v *viper.Vipe return nil, err } - deviceAccessCheck, err := buildDeviceAccessCheck(config, logger, metricsRegistry.NewCounter(InboundWRPMessageCounter), manager) + inboundWRPMessageCounter, err := tf.NewCounterVec( + prometheus.CounterOpts{ + Name: InboundWRPMessageCounter, + Help: "Number of inbound WRP Messages successfully decoded and ready to route to device", + }, + []string{outcomeLabel, reasonLabel}..., + ) + if err != nil { + logger.Error(fmt.Sprintf("Could not create %s metric.", InboundWRPMessageCounter)) + return nil, err + } + + deviceAccessCheck, err := buildDeviceAccessCheck(config, logger, inboundWRPMessageCounter, manager) if err != nil { return nil, err } @@ -369,7 +377,7 @@ func NewPrimaryHandler(logger *zap.Logger, manager device.Manager, v *viper.Vipe return r, nil } -func buildDeviceAccessCheck(config *deviceAccessCheckConfig, logger *zap.Logger, counter metrics.Counter, deviceRegistry device.Registry) (deviceAccess, error) { +func buildDeviceAccessCheck(config *deviceAccessCheckConfig, logger *zap.Logger, counter *prometheus.CounterVec, deviceRegistry device.Registry) (deviceAccess, error) { if len(config.Checks) < 1 { logger.Error("Potential security misconfig. Include checks for deviceAccessCheck or disable it") diff --git a/workerPool.go b/workerPool.go index 73e3e70..2df1692 100644 --- a/workerPool.go +++ b/workerPool.go @@ -13,7 +13,6 @@ import ( "strings" "sync" - "github.com/go-kit/kit/metrics" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -24,7 +23,7 @@ type WorkerPool struct { logger *zap.Logger outbounds <-chan outboundEnvelope workerPoolSize uint - queueSize metrics.Gauge + queueSize prometheus.Gauge droppedMessages CounterVec transactor func(*http.Request) (*http.Response, error)