Skip to content

Commit

Permalink
[receiver/kafkareceiver] Add metrics for unmarshal errors (open-telem…
Browse files Browse the repository at this point in the history
  • Loading branch information
thmshmm authored and cparkins committed Jan 10, 2024
1 parent 6497f20 commit 36c162b
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 0 deletions.
16 changes: 16 additions & 0 deletions .chloggen/kafkareceiver-unmarshal-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add three new metrics to record unmarshal errors.

# One or more tracking issues related to the change
issues: [29302]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
12 changes: 12 additions & 0 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
traces, err := c.unmarshaler.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshal message", zap.Error(err))
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())},
statUnmarshalFailedSpans.M(1))
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
Expand Down Expand Up @@ -531,6 +535,10 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
metrics, err := c.unmarshaler.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshal message", zap.Error(err))
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())},
statUnmarshalFailedMetricPoints.M(1))
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
Expand Down Expand Up @@ -612,6 +620,10 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
logs, err := c.unmarshaler.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshal message", zap.Error(err))
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())},
statUnmarshalFailedLogRecords.M(1))
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
Expand Down
31 changes: 31 additions & 0 deletions receiver/kafkareceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ var (

statPartitionStart = stats.Int64("kafka_receiver_partition_start", "Number of started partitions", stats.UnitDimensionless)
statPartitionClose = stats.Int64("kafka_receiver_partition_close", "Number of finished partitions", stats.UnitDimensionless)

statUnmarshalFailedMetricPoints = stats.Int64("kafka_receiver_unmarshal_failed_metric_points", "Number of metric points failed to be unmarshaled", stats.UnitDimensionless)
statUnmarshalFailedLogRecords = stats.Int64("kafka_receiver_unmarshal_failed_log_records", "Number of log records failed to be unmarshaled", stats.UnitDimensionless)
statUnmarshalFailedSpans = stats.Int64("kafka_receiver_unmarshal_failed_spans", "Number of spans failed to be unmarshaled", stats.UnitDimensionless)
)

// metricViews return metric views for Kafka receiver.
Expand Down Expand Up @@ -64,11 +68,38 @@ func metricViews() []*view.View {
Aggregation: view.Sum(),
}

countUnmarshalFailedMetricPoints := &view.View{
Name: statUnmarshalFailedMetricPoints.Name(),
Measure: statUnmarshalFailedMetricPoints,
Description: statUnmarshalFailedMetricPoints.Description(),
TagKeys: tagKeys,
Aggregation: view.Sum(),
}

countUnmarshalFailedLogRecords := &view.View{
Name: statUnmarshalFailedLogRecords.Name(),
Measure: statUnmarshalFailedLogRecords,
Description: statUnmarshalFailedLogRecords.Description(),
TagKeys: tagKeys,
Aggregation: view.Sum(),
}

countUnmarshalFailedSpans := &view.View{
Name: statUnmarshalFailedSpans.Name(),
Measure: statUnmarshalFailedSpans,
Description: statUnmarshalFailedSpans.Description(),
TagKeys: tagKeys,
Aggregation: view.Sum(),
}

return []*view.View{
countMessages,
lastValueOffset,
lastValueOffsetLag,
countPartitionStart,
countPartitionClose,
countUnmarshalFailedMetricPoints,
countUnmarshalFailedLogRecords,
countUnmarshalFailedSpans,
}
}
3 changes: 3 additions & 0 deletions receiver/kafkareceiver/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func TestMetrics(t *testing.T) {
"kafka_receiver_offset_lag",
"kafka_receiver_partition_start",
"kafka_receiver_partition_close",
"kafka_receiver_unmarshal_failed_metric_points",
"kafka_receiver_unmarshal_failed_log_records",
"kafka_receiver_unmarshal_failed_spans",
}
for i, viewName := range viewNames {
assert.Equal(t, viewName, metricViews[i].Name)
Expand Down

0 comments on commit 36c162b

Please sign in to comment.