Skip to content

Commit

Permalink
Add a stall counter for bulk ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Jan 3, 2024
1 parent 8b2f6d3 commit 5129539
Showing 1 changed file with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public class BulkIngestKafkaProducer extends AbstractExecutionThreadService {
"bulk_ingest_producer_over_limit_pending";
private final Counter overLimitPendingRequests;

public static final String STALL_COUNTER = "bulk_ingest_producer_stall_counter";
private final Counter stallCounter;

public static final String BATCH_SIZE_GAUGE = "bulk_ingest_producer_batch_size";
private final AtomicInteger batchSizeGauge;

Expand Down Expand Up @@ -106,6 +109,7 @@ public BulkIngestKafkaProducer(

this.failedSetResponseCounter = meterRegistry.counter(FAILED_SET_RESPONSE_COUNTER);
this.overLimitPendingRequests = meterRegistry.counter(OVER_LIMIT_PENDING_REQUESTS);
this.stallCounter = meterRegistry.counter(STALL_COUNTER);
this.batchSizeGauge = meterRegistry.gauge(BATCH_SIZE_GAUGE, new AtomicInteger(0));

this.kafkaProducer.initTransactions();
Expand Down Expand Up @@ -134,6 +138,7 @@ protected void run() throws Exception {
batchSizeGauge.set(requests.size());
if (requests.isEmpty()) {
try {
stallCounter.increment();
Thread.sleep(producerSleep);
} catch (InterruptedException e) {
return;
Expand Down

0 comments on commit 5129539

Please sign in to comment.