diff --git a/xprom/xpromkafka/collector.go b/xprom/xpromkafka/collector.go index 0a33d9a..0b99b3b 100644 --- a/xprom/xpromkafka/collector.go +++ b/xprom/xpromkafka/collector.go @@ -38,17 +38,8 @@ type Collector struct { consumed *prometheus.CounterVec } -// RegisterCollector creates and registers a new Collector with the -// provided registerer. -func RegisterCollector(r prometheus.Registerer, opts ...Option) *Collector { - c := newCollector(opts...) - - r.MustRegister(c.duration, c.inflight, c.published, c.consumed) - - return c -} - -func newCollector(opts ...Option) *Collector { +// NewCollector creates a new Collector. +func NewCollector(opts ...Option) *Collector { o := options{ latencyBuckets: defaultLatencyBuckets, } @@ -121,6 +112,27 @@ func newCollector(opts ...Option) *Collector { } } +// Register registers the metrics with the provided registry. +func (c *Collector) Register(registry prometheus.Registerer) error { + if err := registry.Register(c.duration); err != nil { + return err + } + + if err := registry.Register(c.inflight); err != nil { + return err + } + + if err := registry.Register(c.published); err != nil { + return err + } + + if err := registry.Register(c.consumed); err != nil { + return err + } + + return nil +} + // ConsumerMiddleware returns a middleware that instruments xkafka.Consumer. // Options passed to this function will override the Collector options. func (c *Collector) ConsumerMiddleware(opts ...Option) xkafka.MiddlewareFunc { diff --git a/xprom/xpromkafka/collector_test.go b/xprom/xpromkafka/collector_test.go index d58ac7f..26b715e 100644 --- a/xprom/xpromkafka/collector_test.go +++ b/xprom/xpromkafka/collector_test.go @@ -31,11 +31,12 @@ func TestConsumerMiddleware(t *testing.T) { }) reg := prometheus.NewRegistry() - collector := RegisterCollector( - reg, + collector := NewCollector( LatencyBuckets{0.1, 0.5, 1, 2, 5}, ) + collector.Register(reg) + instrumentedHandler := collector.ConsumerMiddleware( Address("localhost"), Port(9092), @@ -80,11 +81,12 @@ func TestProducerMiddleware(t *testing.T) { }) reg := prometheus.NewRegistry() - collector := RegisterCollector( - reg, + collector := NewCollector( LatencyBuckets{0.1, 0.5, 1, 2, 5}, ) + collector.Register(reg) + instrumentedHandler := collector.ProducerMiddleware( Address("localhost"), Port(9092), diff --git a/xprom/xpromkafka/example_test.go b/xprom/xpromkafka/example_test.go index ec64ac3..01e2b6a 100644 --- a/xprom/xpromkafka/example_test.go +++ b/xprom/xpromkafka/example_test.go @@ -23,8 +23,7 @@ func ExampleCollector_ConsumerMiddleware() { ) reg := prometheus.NewRegistry() - collector := xpromkafka.RegisterCollector( - reg, + collector := xpromkafka.NewCollector( xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5}, xpromkafka.Address("localhost:9092"), xpromkafka.ErrorClassifer(func(err error) string { @@ -33,6 +32,7 @@ func ExampleCollector_ConsumerMiddleware() { }), ) + collector.Register(reg) consumer.Use(collector.ConsumerMiddleware()) // Start consuming messages. @@ -40,8 +40,7 @@ func ExampleCollector_ConsumerMiddleware() { func ExampleCollector_ConsumerMiddleware_multipleConsumers() { reg := prometheus.NewRegistry() - collector := xpromkafka.RegisterCollector( - reg, + collector := xpromkafka.NewCollector( xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5}, xpromkafka.ErrorClassifer(func(err error) string { // Classify errors. @@ -49,6 +48,8 @@ func ExampleCollector_ConsumerMiddleware_multipleConsumers() { }), ) + collector.Register(reg) + consumer1, _ := xkafka.NewConsumer( "test-group-1", handler, @@ -81,8 +82,7 @@ func ExampleCollector_ProducerMiddleware() { ) reg := prometheus.NewRegistry() - collector := xpromkafka.RegisterCollector( - reg, + collector := xpromkafka.NewCollector( xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5}, xpromkafka.Address("localhost:9092"), xpromkafka.ErrorClassifer(func(err error) string { @@ -91,6 +91,8 @@ func ExampleCollector_ProducerMiddleware() { }), ) + collector.Register(reg) + producer.Use(collector.ProducerMiddleware()) // Produce messages. @@ -98,8 +100,7 @@ func ExampleCollector_ProducerMiddleware() { func ExampleCollector_ProducerMiddleware_multipleProducers() { reg := prometheus.NewRegistry() - collector := xpromkafka.RegisterCollector( - reg, + collector := xpromkafka.NewCollector( xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5}, xpromkafka.ErrorClassifer(func(err error) string { // Classify errors. @@ -107,6 +108,8 @@ func ExampleCollector_ProducerMiddleware_multipleProducers() { }), ) + collector.Register(reg) + producer1, _ := xkafka.NewProducer( "test-publisher-1", xkafka.Brokers{"localhost:9092"},