Skip to content

Commit

Permalink
remove bytesSent/Received metrics from end to end
Browse files Browse the repository at this point in the history
  • Loading branch information
rikimaru0345 committed May 10, 2021
1 parent b4a08c5 commit d5a116c
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 40 deletions.
41 changes: 4 additions & 37 deletions e2e/client_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,21 @@ import (
"net"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
)

// in e2e we only use client hooks for logging connect/disconnect messages
type clientHooks struct {
logger *zap.Logger

requestSentCount prometheus.Counter
bytesSent prometheus.Counter

requestsReceivedCount prometheus.Counter
bytesReceived prometheus.Counter
}

func newEndToEndClientHooks(logger *zap.Logger, metricsNamespace string) *clientHooks {
requestSentCount := promauto.NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "kafka",
Name: "requests_sent_total"})
bytesSent := promauto.NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "kafka",
Name: "sent_bytes",
})
func newEndToEndClientHooks(logger *zap.Logger) *clientHooks {

requestsReceivedCount := promauto.NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "kafka",
Name: "requests_received_total"})
bytesReceived := promauto.NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "kafka",
Name: "received_bytes",
})
logger = logger.With(zap.String("source", "end_to_end"))

return &clientHooks{
logger: logger,

requestSentCount: requestSentCount,
bytesSent: bytesSent,

requestsReceivedCount: requestsReceivedCount,
bytesReceived: bytesReceived,
}
}

Expand All @@ -76,8 +45,7 @@ func (c clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
// The bytes written does not count any tls overhead.
// OnRead is called after a read from a broker.
func (c clientHooks) OnRead(_ kgo.BrokerMetadata, _ int16, bytesRead int, _, _ time.Duration, _ error) {
c.requestsReceivedCount.Inc()
c.bytesReceived.Add(float64(bytesRead))

}

// OnWrite is passed the broker metadata, the key for the request that
Expand All @@ -88,6 +56,5 @@ func (c clientHooks) OnRead(_ kgo.BrokerMetadata, _ int16, bytesRead int, _, _ t
// The bytes written does not count any tls overhead.
// OnWrite is called after a write to a broker.
func (c clientHooks) OnWrite(_ kgo.BrokerMetadata, _ int16, bytesWritten int, _, _ time.Duration, _ error) {
c.requestSentCount.Inc()
c.bytesSent.Add(float64(bytesWritten))

}
1 change: 0 additions & 1 deletion e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {

}

// todo: extract whole end-to-end feature into its own package
// todo: then also create a "tracker" that knows about in-flight messages, and the latest successful roundtrips

// processMessage takes a message and:
Expand Down
3 changes: 1 addition & 2 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ func createKafkaClient(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service,
}

// Prepare hooks
hooksChildLogger := logger.With(zap.String("source", "end_to_end"))
e2eHooks := newEndToEndClientHooks(hooksChildLogger, "")
e2eHooks := newEndToEndClientHooks(logger)
kgoOpts = append(kgoOpts, kgo.WithHooks(e2eHooks))

// Create kafka service and check if client can successfully connect to Kafka cluster
Expand Down

0 comments on commit d5a116c

Please sign in to comment.