From 5129539a1d27525f48ab0e5c5fdeac6de7576a20 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder Date: Wed, 3 Jan 2024 11:25:51 -0700 Subject: [PATCH] Add a stall counter for bulk ingest --- .../slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java index 2780d58e3b..51a7771b87 100644 --- a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java @@ -64,6 +64,9 @@ public class BulkIngestKafkaProducer extends AbstractExecutionThreadService { "bulk_ingest_producer_over_limit_pending"; private final Counter overLimitPendingRequests; + public static final String STALL_COUNTER = "bulk_ingest_producer_stall_counter"; + private final Counter stallCounter; + public static final String BATCH_SIZE_GAUGE = "bulk_ingest_producer_batch_size"; private final AtomicInteger batchSizeGauge; @@ -106,6 +109,7 @@ public BulkIngestKafkaProducer( this.failedSetResponseCounter = meterRegistry.counter(FAILED_SET_RESPONSE_COUNTER); this.overLimitPendingRequests = meterRegistry.counter(OVER_LIMIT_PENDING_REQUESTS); + this.stallCounter = meterRegistry.counter(STALL_COUNTER); this.batchSizeGauge = meterRegistry.gauge(BATCH_SIZE_GAUGE, new AtomicInteger(0)); this.kafkaProducer.initTransactions(); @@ -134,6 +138,7 @@ protected void run() throws Exception { batchSizeGauge.set(requests.size()); if (requests.isEmpty()) { try { + stallCounter.increment(); Thread.sleep(producerSleep); } catch (InterruptedException e) { return;