Skip to content

Commit

Permalink
Add Backlog Metrics to Kafka Splittable DoFn Implementation (#31281)
Browse files Browse the repository at this point in the history
* Set backlog in gauge metric

* add backlog metrics to splittable dofn Kafka read implementatino

---------

Co-authored-by: Naireen <[email protected]>
  • Loading branch information
Naireen and Naireen authored May 28, 2024
1 parent bb4c1e6 commit fd4368f
Showing 1 changed file with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand Down Expand Up @@ -222,6 +223,9 @@ private ReadFromKafkaDoFn(

private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;
private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L;

private HashMap<String, Long> perPartitionBacklogMetrics = new HashMap<String, Long>();;

@VisibleForTesting final long consumerPollingTimeout;
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
@VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
Expand Down Expand Up @@ -342,6 +346,13 @@ public double getSize(
if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) {
return numRecords;
}
if (offsetEstimatorCache != null) {
for (Map.Entry<TopicPartition, KafkaLatestOffsetEstimator> tp :
offsetEstimatorCache.entrySet()) {
perPartitionBacklogMetrics.put(tp.getKey().toString(), tp.getValue().estimate());
}
}

return avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition()).getTotalSize(numRecords);
}

Expand Down Expand Up @@ -394,6 +405,13 @@ public ProcessContinuation processElement(
Metrics.distribution(
METRIC_NAMESPACE,
RAW_SIZE_METRIC_PREFIX + kafkaSourceDescriptor.getTopicPartition().toString());
for (Map.Entry<String, Long> backlogSplit : perPartitionBacklogMetrics.entrySet()) {
Gauge backlog =
Metrics.gauge(
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + backlogSplit.getKey());
backlog.set(backlogSplit.getValue());
}

// Stop processing current TopicPartition when it's time to stop.
if (checkStopReadingFn != null
&& checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())) {
Expand Down

0 comments on commit fd4368f

Please sign in to comment.