Skip to content

Commit

Permalink
Add experimental Kafka batch producer
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Dec 15, 2023
1 parent aaa96a0 commit 18c31ba
Showing 1 changed file with 81 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -69,7 +73,7 @@ public class OpenSearchBulkIngestApi extends AbstractService {
private final KafkaProducer kafkaProducer;
private final KafkaClientMetrics kafkaMetrics;

private final ReentrantLock lockTransactionalProducer = new ReentrantLock();
private Thread producerThread;

@Override
protected void doStart() {
Expand All @@ -78,6 +82,7 @@ protected void doStart() {
load();
datasetMetadataStore.addListener(datasetListener);
LOG.info("OpenSearchBulkIngestAPI service started");
producerThread = Thread.ofVirtual().start(this::run);
notifyStarted();
} catch (Throwable t) {
notifyFailed(t);
Expand All @@ -89,6 +94,9 @@ protected void doStop() {
try {
LOG.info("Stopping OpenSearchBulkIngestApi service");
datasetMetadataStore.removeListener(datasetListener);
if (producerThread != null) {
producerThread.interrupt();
}
kafkaProducer.close();
if (kafkaMetrics != null) {
kafkaMetrics.close();
Expand Down Expand Up @@ -193,7 +201,7 @@ public HttpResponse addDocument(String bulkRequest) {
return HttpResponse.ofJson(TOO_MANY_REQUESTS, response);
}
}
BulkIngestResponse response = produceDocuments(docs);
BulkIngestResponse response = createRequest(docs).getResponse();
return HttpResponse.ofJson(response);
} catch (Exception e) {
LOG.error("Request failed ", e);
Expand All @@ -202,6 +210,76 @@ public HttpResponse addDocument(String bulkRequest) {
}
}

static class BatchRequest {
private final Map<String, List<Trace.Span>> inputDocs;
private final SynchronousQueue<BulkIngestResponse> internalResponse = new SynchronousQueue<>();

public BatchRequest(Map<String, List<Trace.Span>> inputDocs) {
this.inputDocs = inputDocs;
}

public Map<String, List<Trace.Span>> getInputDocs() {
return inputDocs;
}

public void setResponse(BulkIngestResponse response) {
internalResponse.add(response);
}

public BulkIngestResponse getResponse() throws InterruptedException {
return internalResponse.take();
}
}

BlockingQueue<BatchRequest> pendingRequests = new ArrayBlockingQueue<>(500);

public BatchRequest createRequest(Map<String, List<Trace.Span>> inputDocs) {
BatchRequest request = new BatchRequest(inputDocs);
// todo - add can throw exceptions
pendingRequests.add(request);
return request;
}

public void run() {
while (true) {
List<BatchRequest> requests = new ArrayList<>();
pendingRequests.drainTo(requests);

if (requests.isEmpty()) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
return;
}
} else {
try {
Map<BatchRequest, BulkIngestResponse> responseMap = new HashMap<>();
kafkaProducer.beginTransaction();
for (BatchRequest request : requests) {
responseMap.put(request, produceDocuments(request.getInputDocs()));
}
kafkaProducer.commitTransaction();
responseMap.forEach(BatchRequest::setResponse);
} catch (Exception e) {
LOG.warn("failed transaction with error", e);
try {
kafkaProducer.abortTransaction();
} catch (ProducerFencedException err) {
LOG.error("Could not abort transaction", err);
}

for (BatchRequest request : requests) {
request.setResponse(
new BulkIngestResponse(
0,
request.inputDocs.values().stream().mapToInt(List::size).sum(),
e.getMessage()));
}
}
}
}
}

@SuppressWarnings("FutureReturnValueIgnored")
public BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDocs) {
int totalDocs = indexDocs.values().stream().mapToInt(List::size).sum();
Expand All @@ -225,9 +303,7 @@ public BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDo
// Till we fix the producer design to allow for multiple /_bulk requests to be able to
// write to the same txn
// we will limit producing documents 1 thread at a time
lockTransactionalProducer.lock();
try {
kafkaProducer.beginTransaction();
for (Trace.Span doc : indexDoc.getValue()) {
ProducerRecord<String, byte[]> producerRecord =
new ProducerRecord<>(
Expand All @@ -238,7 +314,6 @@ public BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDo
// messages
kafkaProducer.send(producerRecord);
}
kafkaProducer.commitTransaction();
} catch (TimeoutException te) {
LOG.error("Commit transaction timeout", te);
// the commitTransaction waits till "max.block.ms" after which it will time out
Expand All @@ -255,16 +330,6 @@ public BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDo
// We can't recover from these exceptions, so our only option is to close the producer and
// exit.
new RuntimeHalterImpl().handleFatal(new Throwable("KafkaProducer needs to shutdown ", e));
} catch (Exception e) {
LOG.warn("failed transaction with error", e);
try {
kafkaProducer.abortTransaction();
} catch (ProducerFencedException err) {
LOG.error("Could not abort transaction", err);
}
return new BulkIngestResponse(0, totalDocs, e.getMessage());
} finally {
lockTransactionalProducer.unlock();
}
}

Expand Down

0 comments on commit 18c31ba

Please sign in to comment.