Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix kafka sinke metrics #3549

Merged
merged 1 commit into from
Feb 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 22 additions & 42 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,19 @@
)

type KafkaSink struct {
writer *kafkago.Writer
kc *kafkaConf
tlsConfig *tls.Config
headersMap map[string]string
headerTemplate string
saslConf *saslConf
mechanism sasl.Mechanism
LastStats kafkago.WriterStats
LastCollectStats *KafkaCollectStats
msgQ chan *kafkago.Message
messages []kafkago.Message
currIndex int
writer *kafkago.Writer
kc *kafkaConf
tlsConfig *tls.Config
headersMap map[string]string
headerTemplate string
saslConf *saslConf
mechanism sasl.Mechanism
LastStats kafkago.WriterStats
msgQ chan *kafkago.Message
messages []kafkago.Message
currIndex int
ruleID string
opID string
}

func (k *KafkaSink) Info() model.SinkInfo {
Expand Down Expand Up @@ -143,7 +144,6 @@
if err != nil {
return err
}
k.LastCollectStats = &KafkaCollectStats{}
k.msgQ = make(chan *kafkago.Message, 2*k.kc.BatchSize)
// run batch
switch {
Expand Down Expand Up @@ -213,6 +213,8 @@
}

func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error {
k.ruleID = ctx.GetRuleId()
k.opID = ctx.GetOpId()
err := k.buildKafkaWriter()
if err != nil {
sch(api.ConnectionDisconnected, err.Error())
Expand Down Expand Up @@ -287,11 +289,10 @@
func (k *KafkaSink) send(ctx api.StreamContext) {
start := time.Now()
defer func() {
metrics.IODurationHist.WithLabelValues(LblKafka, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
metrics.IODurationHist.WithLabelValues(LblKafka, metrics.LblSinkIO, k.ruleID, k.opID).Observe(float64(time.Since(start).Microseconds()))

Check warning on line 292 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L292

Added line #L292 was not covered by tests
}()
err := k.writer.WriteMessages(ctx, k.messages...)
k.handleErrMsgs(ctx, err, len(k.messages))
k.updateMetrics(ctx)
k.messages = make([]kafkago.Message, 0, k.kc.BatchSize/4)
k.currIndex = 0
}
Expand All @@ -301,15 +302,6 @@
}

func (k *KafkaSink) collect(ctx api.StreamContext, item api.RawTuple) error {
start := time.Now()
defer func() {
collectDuration := time.Since(start)
k.LastCollectStats.TotalCollectMsgDuration += collectDuration
KafkaSinkCollectDurationHist.WithLabelValues(LblCollect, LblMsg, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(collectDuration.Microseconds()))
}()
unmarshalDuration := time.Duration(0)
k.LastCollectStats.TotalUnmarshalMsgDuration += unmarshalDuration
KafkaSinkCollectDurationHist.WithLabelValues(LblUnmarshal, LblMsg, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(unmarshalDuration.Microseconds()))
msg, err := k.buildMsg(ctx, item)
if err != nil {
return err
Expand All @@ -322,12 +314,6 @@
}

func (k *KafkaSink) buildMsg(ctx api.StreamContext, item api.RawTuple) (kafkago.Message, error) {
start := time.Now()
defer func() {
buildDuration := time.Since(start)
k.LastCollectStats.TotalBuildMsgDuration += buildDuration
KafkaSinkCollectDurationHist.WithLabelValues(lblBuild, LblMsg, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(buildDuration.Microseconds()))
}()
msg := kafkago.Message{Value: item.Raw()}
if len(k.kc.Key) > 0 {
newKey := k.kc.Key
Expand Down Expand Up @@ -412,25 +398,19 @@
return nil, nil
}

func (k *KafkaSink) updateMetrics(ctx api.StreamContext) {
KafkaSinkCollectDurationHist.WithLabelValues(lblBuild, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(k.LastCollectStats.TotalBuildMsgDuration.Microseconds()))
KafkaSinkCollectDurationHist.WithLabelValues(LblUnmarshal, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(k.LastCollectStats.TotalUnmarshalMsgDuration.Microseconds()))
KafkaSinkCollectDurationHist.WithLabelValues(LblCollect, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(k.LastCollectStats.TotalCollectMsgDuration.Microseconds()))
}

func (k *KafkaSink) handleErrMsgs(ctx api.StreamContext, err error, count int) {
if err == nil {
KafkaSinkCounter.WithLabelValues(metrics.LblSuccess, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Inc()
KafkaSinkCounter.WithLabelValues(metrics.LblSuccess, LblMsg, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(count))
KafkaSinkCounter.WithLabelValues(metrics.LblSuccess, LblReq, k.ruleID, k.opID).Inc()
KafkaSinkCounter.WithLabelValues(metrics.LblSuccess, LblMsg, k.ruleID, k.opID).Add(float64(count))

Check warning on line 404 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L403-L404

Added lines #L403 - L404 were not covered by tests
return
}
KafkaSinkCounter.WithLabelValues(metrics.LblException, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Inc()
KafkaSinkCounter.WithLabelValues(metrics.LblException, LblReq, k.ruleID, k.opID).Inc()

Check warning on line 407 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L407

Added line #L407 was not covered by tests
switch wErrors := err.(type) {
case kafkago.WriteErrors:
KafkaSinkCounter.WithLabelValues(metrics.LblException, LblMsg, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(wErrors.Count()))
KafkaSinkCounter.WithLabelValues(metrics.LblSuccess, LblMsg, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(count - wErrors.Count()))
KafkaSinkCounter.WithLabelValues(metrics.LblException, LblMsg, k.ruleID, k.opID).Add(float64(wErrors.Count()))
KafkaSinkCounter.WithLabelValues(metrics.LblSuccess, LblMsg, k.ruleID, k.opID).Add(float64(count - wErrors.Count()))

Check warning on line 411 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L410-L411

Added lines #L410 - L411 were not covered by tests
default:
KafkaSinkCounter.WithLabelValues(metrics.LblException, LblMsg, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(count))
KafkaSinkCounter.WithLabelValues(metrics.LblException, LblMsg, k.ruleID, k.opID).Add(float64(count))

Check warning on line 413 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L413

Added line #L413 was not covered by tests
}
}

Expand Down
Loading