Skip to content

Commit

Permalink
Refactored Registry
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnes committed Jul 9, 2024
1 parent 1af4852 commit cbfaebf
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 23 deletions.
34 changes: 23 additions & 11 deletions xprom/xpromkafka/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,8 @@ type Collector struct {
consumed *prometheus.CounterVec
}

// RegisterCollector creates and registers a new Collector with the
// provided registerer.
func RegisterCollector(r prometheus.Registerer, opts ...Option) *Collector {
c := newCollector(opts...)

r.MustRegister(c.duration, c.inflight, c.published, c.consumed)

return c
}

func newCollector(opts ...Option) *Collector {
// NewCollector creates a new Collector.
func NewCollector(opts ...Option) *Collector {
o := options{
latencyBuckets: defaultLatencyBuckets,
}
Expand Down Expand Up @@ -121,6 +112,27 @@ func newCollector(opts ...Option) *Collector {
}
}

// Register registers the metrics with the provided registry.
func (c *Collector) Register(registry prometheus.Registerer) error {
if err := registry.Register(c.duration); err != nil {
return err
}

if err := registry.Register(c.inflight); err != nil {
return err
}

if err := registry.Register(c.published); err != nil {
return err
}

if err := registry.Register(c.consumed); err != nil {
return err
}

return nil
}

// ConsumerMiddleware returns a middleware that instruments xkafka.Consumer.
// Options passed to this function will override the Collector options.
func (c *Collector) ConsumerMiddleware(opts ...Option) xkafka.MiddlewareFunc {
Expand Down
10 changes: 6 additions & 4 deletions xprom/xpromkafka/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func TestConsumerMiddleware(t *testing.T) {
})

reg := prometheus.NewRegistry()
collector := RegisterCollector(
reg,
collector := NewCollector(
LatencyBuckets{0.1, 0.5, 1, 2, 5},
)

collector.Register(reg)

instrumentedHandler := collector.ConsumerMiddleware(
Address("localhost"),
Port(9092),
Expand Down Expand Up @@ -80,11 +81,12 @@ func TestProducerMiddleware(t *testing.T) {
})

reg := prometheus.NewRegistry()
collector := RegisterCollector(
reg,
collector := NewCollector(
LatencyBuckets{0.1, 0.5, 1, 2, 5},
)

collector.Register(reg)

instrumentedHandler := collector.ProducerMiddleware(
Address("localhost"),
Port(9092),
Expand Down
19 changes: 11 additions & 8 deletions xprom/xpromkafka/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ func ExampleCollector_ConsumerMiddleware() {
)

reg := prometheus.NewRegistry()
collector := xpromkafka.RegisterCollector(
reg,
collector := xpromkafka.NewCollector(
xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5},
xpromkafka.Address("localhost:9092"),
xpromkafka.ErrorClassifer(func(err error) string {
Expand All @@ -33,22 +32,24 @@ func ExampleCollector_ConsumerMiddleware() {
}),
)

collector.Register(reg)
consumer.Use(collector.ConsumerMiddleware())

// Start consuming messages.
}

func ExampleCollector_ConsumerMiddleware_multipleConsumers() {
reg := prometheus.NewRegistry()
collector := xpromkafka.RegisterCollector(
reg,
collector := xpromkafka.NewCollector(
xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5},
xpromkafka.ErrorClassifer(func(err error) string {
// Classify errors.
return "CustomError"
}),
)

collector.Register(reg)

consumer1, _ := xkafka.NewConsumer(
"test-group-1",
handler,
Expand Down Expand Up @@ -81,8 +82,7 @@ func ExampleCollector_ProducerMiddleware() {
)

reg := prometheus.NewRegistry()
collector := xpromkafka.RegisterCollector(
reg,
collector := xpromkafka.NewCollector(
xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5},
xpromkafka.Address("localhost:9092"),
xpromkafka.ErrorClassifer(func(err error) string {
Expand All @@ -91,22 +91,25 @@ func ExampleCollector_ProducerMiddleware() {
}),
)

collector.Register(reg)

producer.Use(collector.ProducerMiddleware())

// Produce messages.
}

func ExampleCollector_ProducerMiddleware_multipleProducers() {
reg := prometheus.NewRegistry()
collector := xpromkafka.RegisterCollector(
reg,
collector := xpromkafka.NewCollector(
xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5},
xpromkafka.ErrorClassifer(func(err error) string {
// Classify errors.
return "CustomError"
}),
)

collector.Register(reg)

producer1, _ := xkafka.NewProducer(
"test-publisher-1",
xkafka.Brokers{"localhost:9092"},
Expand Down

0 comments on commit cbfaebf

Please sign in to comment.