Skip to content

Commit

Permalink
Transaction batching Kafka producer (#735)
Browse files Browse the repository at this point in the history
* Add experimental Kafka batch producer

* Initial refactor of experimental batch producer

* Cleanup & rename refactor

* Additional cleanup, documentation, error handling, and metrics added

* Rework exception handling to better catch errors

* Adjust producer linger/batch configs

* Add bulk ingest edge instrumentation

* Add support for additional kafka props on bulk ingest

* Remove default configs in favor of env settings

* Add ability to override blocking task executor limits for bulk ingest

* Add timer for bulk ingest

* Add a stall counter for bulk ingest

* Test pooling kafka producers

* Increase default kafka producer pool, make configurable

* Move bulk ingest to non-blocking virtual threads

* Remove pooled producer logic, unused metrics, early exit

* Remove unused blocking config

* PR feedback

---------

Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Jan 8, 2024
1 parent bb2346a commit d99ae81
Show file tree
Hide file tree
Showing 23 changed files with 1,065 additions and 572 deletions.
5 changes: 4 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ preprocessorConfig:
numStreamThreads: ${KAFKA_STREAM_THREADS:-2}
processingGuarantee: ${KAFKA_STREAM_PROCESSING_GUARANTEE:-at_least_once}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}
kafkaConfig:
kafkaTopic: ${KAFKA_TOPIC:-test-topic}
kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}

serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
Expand All @@ -136,5 +140,4 @@ preprocessorConfig:
dataTransformer: ${PREPROCESSOR_TRANSFORMER:-json}
rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1}
kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
useBulkApi: ${KALDB_PREPROCESSOR_USE_BULK_API:-false}
8 changes: 8 additions & 0 deletions kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,14 @@
<skipSortingImports>false</skipSortingImports>
<style>google</style>
</configuration>
<dependencies>
<!-- Remove after https://github.com/spotify/fmt-maven-plugin/pull/185 merged -->
<dependency>
<groupId>com.google.googlejavaformat</groupId>
<artifactId>google-java-format</artifactId>
<version>1.19.1</version>
</dependency>
</dependencies>
<executions>
<execution>
<goals>
Expand Down
105 changes: 105 additions & 0 deletions kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.slack.kaldb.bulkIngestApi;

import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS;

import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is responsible for defining the http endpoint behavior for the bulk ingest. It is
* expected to handle appropriate rate limiting, error handling, and submit the parsed messages to
* Kafka for ingestion.
*/
public class BulkIngestApi {
private static final Logger LOG = LoggerFactory.getLogger(BulkIngestApi.class);
private final BulkIngestKafkaProducer bulkIngestKafkaProducer;
private final DatasetRateLimitingService datasetRateLimitingService;
private final MeterRegistry meterRegistry;
private final Counter incomingByteTotal;
private final Timer bulkIngestTimer;
private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";

public BulkIngestApi(
BulkIngestKafkaProducer bulkIngestKafkaProducer,
DatasetRateLimitingService datasetRateLimitingService,
MeterRegistry meterRegistry) {

this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
this.datasetRateLimitingService = datasetRateLimitingService;
this.meterRegistry = meterRegistry;
this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
}

@Post("/_bulk")
public HttpResponse addDocument(String bulkRequest) {
// 1. Kaldb does not support the concept of "updates". It's always an add.
// 2. The "index" is used as the span name
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
Timer.Sample sample = Timer.start(meterRegistry);
future.thenRun(() -> sample.stop(bulkIngestTimer));

try {
byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8);
incomingByteTotal.increment(bulkRequestBytes.length);
Map<String, List<Trace.Span>> docs = BulkApiRequestParser.parseRequest(bulkRequestBytes);

// todo - our rate limiter doesn't have a way to acquire permits across multiple
// datasets
// so today as a limitation we reject any request that has documents against
// multiple indexes
// We think most indexing requests will be against 1 index
if (docs.keySet().size() > 1) {
BulkIngestResponse response =
new BulkIngestResponse(0, 0, "request must contain only 1 unique index");
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
return HttpResponse.of(future);
}

for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) {
final String index = indexDocs.getKey();
if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
future.complete(HttpResponse.ofJson(TOO_MANY_REQUESTS, response));
return HttpResponse.of(future);
}
}

// todo - explore the possibility of using the blocking task executor backed by virtual
// threads to fulfill this
Thread.ofVirtual()
.start(
() -> {
try {
BulkIngestResponse response =
bulkIngestKafkaProducer.submitRequest(docs).getResponse();
future.complete(HttpResponse.ofJson(response));
} catch (InterruptedException e) {
LOG.error("Request failed ", e);
future.complete(
HttpResponse.ofJson(
INTERNAL_SERVER_ERROR, new BulkIngestResponse(0, 0, e.getMessage())));
}
});
} catch (Exception e) {
LOG.error("Request failed ", e);
BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
}

return HttpResponse.of(future);
}
}
Loading

0 comments on commit d99ae81

Please sign in to comment.