Skip to content

Commit

Permalink
Test pooling kafka producers
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Jan 3, 2024
1 parent 5129539 commit 1d4aeef
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 21 deletions.
5 changes: 5 additions & 0 deletions kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.12.0</version>
</dependency>

<!-- Jackson JSON parser dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.slack.kaldb.metadata.dataset.DatasetMetadata.MATCH_ALL_SERVICE;
import static com.slack.kaldb.metadata.dataset.DatasetMetadata.MATCH_STAR_SERVICE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener;
import com.slack.kaldb.metadata.dataset.DatasetMetadata;
Expand All @@ -14,6 +15,7 @@
import com.slack.kaldb.writer.KafkaUtils;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.util.ArrayList;
Expand All @@ -26,8 +28,15 @@
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -41,8 +50,11 @@
public class BulkIngestKafkaProducer extends AbstractExecutionThreadService {
private static final Logger LOG = LoggerFactory.getLogger(BulkIngestKafkaProducer.class);

private final KafkaProducer<String, byte[]> kafkaProducer;
private final KafkaClientMetrics kafkaMetrics;
private final ObjectPool<KafkaProducer<String, byte[]>> kafkaProducerPool;

private final AtomicInteger poolCounter = new AtomicInteger(0);

private final Map<Integer, KafkaClientMetrics> metricsMap = new ConcurrentHashMap<>();

private final KaldbConfigs.KafkaConfig kafkaConfig;

Expand Down Expand Up @@ -102,17 +114,37 @@ public BulkIngestKafkaProducer(
// I think they will remain in kafka till they expire. They should never be readable if the
// consumer sets isolation.level as "read_committed"
// see "zombie fencing" https://www.confluent.io/blog/transactions-apache-kafka/
this.kafkaProducer = createKafkaTransactionProducer(UUID.randomUUID().toString());

this.kafkaMetrics = new KafkaClientMetrics(kafkaProducer);
this.kafkaMetrics.bindTo(meterRegistry);
GenericObjectPoolConfig<KafkaProducer<String, byte[]>> config = new GenericObjectPoolConfig<>();
this.kafkaProducerPool =
new GenericObjectPool<>(
new BasePooledObjectFactory<>() {
@Override
public KafkaProducer<String, byte[]> create() {
KafkaProducer<String, byte[]> producer =
createKafkaTransactionProducer(UUID.randomUUID().toString());
int metricsCounter = poolCounter.getAndIncrement();
KafkaClientMetrics clientMetrics =
new KafkaClientMetrics(
producer,
List.of(Tag.of("bulk_instance_id", String.valueOf(metricsCounter))));
clientMetrics.bindTo(meterRegistry);
metricsMap.put(metricsCounter, clientMetrics);
producer.initTransactions();
return producer;
}

@Override
public PooledObject<KafkaProducer<String, byte[]>> wrap(
KafkaProducer<String, byte[]> stringKafkaProducer) {
return new DefaultPooledObject<>(stringKafkaProducer);
}
},
config);

this.failedSetResponseCounter = meterRegistry.counter(FAILED_SET_RESPONSE_COUNTER);
this.overLimitPendingRequests = meterRegistry.counter(OVER_LIMIT_PENDING_REQUESTS);
this.stallCounter = meterRegistry.counter(STALL_COUNTER);
this.batchSizeGauge = meterRegistry.gauge(BATCH_SIZE_GAUGE, new AtomicInteger(0));

this.kafkaProducer.initTransactions();
}

private void cacheSortedDataset() {
Expand Down Expand Up @@ -144,19 +176,30 @@ protected void run() throws Exception {
return;
}
} else {
transactionCommit(requests);
try {
KafkaProducer<String, byte[]> kafkaProducer = kafkaProducerPool.borrowObject();
Thread.ofVirtual()
.start(
() -> {
transactionCommit(requests, kafkaProducer);
try {
kafkaProducerPool.returnObject(kafkaProducer);
} catch (Exception e) {
LOG.error("Error attempting to return kafka producer to pool", e);
}
});
} catch (Exception e) {
LOG.error("Error attempting to borrow kafka producer from pool", e);
}
}
}
}

@Override
protected void shutDown() throws Exception {
datasetMetadataStore.removeListener(datasetListener);

kafkaProducer.close();
if (kafkaMetrics != null) {
kafkaMetrics.close();
}
kafkaProducerPool.close();
metricsMap.forEach((_, kafkaMetrics) -> kafkaMetrics.close());
}

public BulkIngestRequest submitRequest(Map<String, List<Trace.Span>> inputDocs) {
Expand All @@ -170,15 +213,30 @@ public BulkIngestRequest submitRequest(Map<String, List<Trace.Span>> inputDocs)
return request;
}

@Deprecated
@VisibleForTesting
protected Map<BulkIngestRequest, BulkIngestResponse> transactionCommit(
List<BulkIngestRequest> requests) throws Exception {
KafkaProducer<String, byte[]> kafkaProducer = kafkaProducerPool.borrowObject();
Map<BulkIngestRequest, BulkIngestResponse> response =
transactionCommit(requests, kafkaProducer);
kafkaProducerPool.returnObject(kafkaProducer);
return response;
}

protected Map<BulkIngestRequest, BulkIngestResponse> transactionCommit(
List<BulkIngestRequest> requests) {
List<BulkIngestRequest> requests, KafkaProducer<String, byte[]> kafkaProducer) {
Map<BulkIngestRequest, BulkIngestResponse> responseMap = new HashMap<>();

// KafkaProducer<String, byte[]> kafkaProducer = null;
try {
// kafkaProducer = kafkaProducerPool.borrowObject();
kafkaProducer.beginTransaction();
for (BulkIngestRequest request : requests) {
responseMap.put(request, produceDocuments(request.getInputDocs()));
responseMap.put(request, produceDocuments(request.getInputDocs(), kafkaProducer));
}
kafkaProducer.commitTransaction();
// kafkaProducerPool.returnObject(kafkaProducer);
} catch (TimeoutException te) {
LOG.error("Commit transaction timeout", te);
// the commitTransaction waits till "max.block.ms" after which it will time out
Expand All @@ -197,10 +255,13 @@ protected Map<BulkIngestRequest, BulkIngestResponse> transactionCommit(
new RuntimeHalterImpl().handleFatal(new Throwable("KafkaProducer needs to shutdown ", e));
} catch (Exception e) {
LOG.warn("failed transaction with error", e);
try {
kafkaProducer.abortTransaction();
} catch (ProducerFencedException err) {
LOG.error("Could not abort transaction", err);
if (kafkaProducer != null) {
try {
kafkaProducer.abortTransaction();
// kafkaProducerPool.returnObject(kafkaProducer);
} catch (ProducerFencedException err) {
LOG.error("Could not abort transaction", err);
}
}

for (BulkIngestRequest request : requests) {
Expand All @@ -225,7 +286,8 @@ protected Map<BulkIngestRequest, BulkIngestResponse> transactionCommit(
}

@SuppressWarnings("FutureReturnValueIgnored")
private BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDocs) {
private BulkIngestResponse produceDocuments(
Map<String, List<Trace.Span>> indexDocs, KafkaProducer<String, byte[]> kafkaProducer) {
int totalDocs = indexDocs.values().stream().mapToInt(List::size).sum();

// we cannot create a generic pool of producers because the kafka API expects the transaction ID
Expand Down

0 comments on commit 1d4aeef

Please sign in to comment.