Skip to content

Commit

Permalink
Add support for additional kafka props on bulk ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Dec 21, 2023
1 parent e6b9e72 commit 3ad0a20
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 57 deletions.
5 changes: 4 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ preprocessorConfig:
numStreamThreads: ${KAFKA_STREAM_THREADS:-2}
processingGuarantee: ${KAFKA_STREAM_PROCESSING_GUARANTEE:-at_least_once}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}
kafkaConfig:
kafkaTopic: ${KAFKA_TOPIC:-test-topic}
kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}

serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
Expand All @@ -136,5 +140,4 @@ preprocessorConfig:
dataTransformer: ${PREPROCESSOR_TRANSFORMER:-json}
rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1}
kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
useBulkApi: ${KALDB_PREPROCESSOR_USE_BULK_API:-false}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.slack.kaldb.preprocessor.PreprocessorService;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.util.RuntimeHalterImpl;
import com.slack.kaldb.writer.KafkaUtils;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
Expand All @@ -21,12 +22,14 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
Expand All @@ -41,7 +44,7 @@ public class BulkIngestKafkaProducer extends AbstractExecutionThreadService {
private final KafkaProducer<String, byte[]> kafkaProducer;
private final KafkaClientMetrics kafkaMetrics;

private final KaldbConfigs.PreprocessorConfig preprocessorConfig;
private final KaldbConfigs.KafkaConfig kafkaConfig;

private final DatasetMetadataStore datasetMetadataStore;
private final KaldbMetadataStoreChangeListener<DatasetMetadata> datasetListener =
Expand All @@ -64,19 +67,28 @@ public class BulkIngestKafkaProducer extends AbstractExecutionThreadService {
public static final String BATCH_SIZE_GAUGE = "bulk_ingest_producer_batch_size";
private final AtomicInteger batchSizeGauge;

private static final Set<String> OVERRIDABLE_CONFIGS =
Set.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ProducerConfig.LINGER_MS_CONFIG,
ProducerConfig.BATCH_SIZE_CONFIG,
ProducerConfig.MAX_BLOCK_MS_CONFIG,
ProducerConfig.COMPRESSION_TYPE_CONFIG);

public BulkIngestKafkaProducer(
final DatasetMetadataStore datasetMetadataStore,
final KaldbConfigs.PreprocessorConfig preprocessorConfig,
final PrometheusMeterRegistry meterRegistry) {

this.kafkaConfig = preprocessorConfig.getKafkaConfig();

checkArgument(
!preprocessorConfig.getBootstrapServers().isEmpty(),
!kafkaConfig.getKafkaBootStrapServers().isEmpty(),
"Kafka bootstrapServers must be provided");

checkArgument(
!preprocessorConfig.getDownstreamTopic().isEmpty(),
"Kafka downstreamTopic must be provided");
checkArgument(!kafkaConfig.getKafkaTopic().isEmpty(), "Kafka topic must be provided");

this.preprocessorConfig = preprocessorConfig;
this.datasetMetadataStore = datasetMetadataStore;

// todo - consider making these a configurable value, or determine a way to derive a reasonable
Expand Down Expand Up @@ -239,8 +251,7 @@ private BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexD
// we will limit producing documents 1 thread at a time
for (Trace.Span doc : indexDoc.getValue()) {
ProducerRecord<String, byte[]> producerRecord =
new ProducerRecord<>(
preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray());
new ProducerRecord<>(kafkaConfig.getKafkaTopic(), partition, index, doc.toByteArray());

// we intentionally supress FutureReturnValueIgnored here in errorprone - this is because
// we wrap this in a transaction, which is responsible for flushing all of the pending
Expand All @@ -254,14 +265,29 @@ private BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexD

private KafkaProducer<String, byte[]> createKafkaTransactionProducer(String transactionId) {
Properties props = new Properties();
props.put("bootstrap.servers", preprocessorConfig.getBootstrapServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("transactional.id", transactionId);
props.put("linger.ms", 500);
props.put("batch.size", 128000);
props.put("max.block.ms", 10000);
props.put("compression.type", "snappy");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaBootStrapServers());
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128000);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

// don't override the properties that we have already set explicitly using named properties
for (Map.Entry<String, String> additionalProp :
kafkaConfig.getAdditionalPropsMap().entrySet()) {
props =
KafkaUtils.maybeOverrideProps(
props,
additionalProp.getKey(),
additionalProp.getValue(),
OVERRIDABLE_CONFIGS.contains(additionalProp.getKey()));
}
return new KafkaProducer<>(props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION;
import static com.slack.kaldb.writer.kafka.KaldbKafkaConsumer.maybeOverride;

import com.google.common.util.concurrent.AbstractService;
import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener;
import com.slack.kaldb.metadata.dataset.DatasetMetadata;
import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.writer.KafkaUtils;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
Expand Down Expand Up @@ -313,7 +313,9 @@ protected static Properties makeKafkaStreamsProps(
// don't override any property we already set
for (Map.Entry<String, String> additionalProp :
kafkaStreamConfig.getAdditionalPropsMap().entrySet()) {
maybeOverride(props, additionalProp.getKey(), additionalProp.getValue(), false);
props =
KafkaUtils.maybeOverrideProps(
props, additionalProp.getKey(), additionalProp.getValue(), false);
}

return props;
Expand Down
35 changes: 35 additions & 0 deletions kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.slack.kaldb.writer;

import com.google.common.annotations.VisibleForTesting;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Shared kafka functions for producers, consumers, and stream applications */
public class KafkaUtils {
private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);

@VisibleForTesting
public static Properties maybeOverrideProps(
Properties inputProps, String key, String value, boolean override) {
Properties changedProps = new Properties(inputProps);
String userValue = inputProps.getProperty(key);
if (userValue != null) {
if (override) {
LOG.warn(
String.format(
"Property %s is provided but will be overridden from %s to %s",
key, userValue, value));
inputProps.setProperty(key, value);
} else {
LOG.warn(
String.format(
"Property %s is provided but won't be overridden from %s to %s",
key, userValue, value));
}
} else {
inputProps.setProperty(key, value);
}
return changedProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.server.KaldbConfig;
import com.slack.kaldb.writer.KafkaUtils;
import com.slack.kaldb.writer.LogMessageWriterImpl;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -82,40 +83,16 @@ public static Properties makeKafkaConsumerProps(KaldbConfigs.KafkaConfig kafkaCo
// don't override the properties that we have already set explicitly using named properties
for (Map.Entry<String, String> additionalProp :
kafkaConfig.getAdditionalPropsMap().entrySet()) {
maybeOverride(
props,
additionalProp.getKey(),
additionalProp.getValue(),
OVERRIDABLE_CONFIGS.contains(additionalProp.getKey()));
props =
KafkaUtils.maybeOverrideProps(
props,
additionalProp.getKey(),
additionalProp.getValue(),
OVERRIDABLE_CONFIGS.contains(additionalProp.getKey()));
}
return props;
}

@VisibleForTesting
public static boolean maybeOverride(
Properties props, String key, String value, boolean override) {
boolean overridden = false;
String userValue = props.getProperty(key);
if (userValue != null) {
if (override) {
LOG.warn(
String.format(
"Property %s is provided but will be overridden from %s to %s",
key, userValue, value));
props.setProperty(key, value);
overridden = true;
} else {
LOG.warn(
String.format(
"Property %s is provided but won't be overridden from %s to %s",
key, userValue, value));
}
} else {
props.setProperty(key, value);
}
return overridden;
}

private KafkaConsumer<String, byte[]> kafkaConsumer;
private final TopicPartition topicPartition;

Expand Down
4 changes: 2 additions & 2 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ message PreprocessorConfig {
// more docs on PreprocessorPartitioner#getDatasetPartitionSuppliers
int32 kafka_partition_sticky_timeout_ms = 8;

// this value needs to be set if the bulk API is used to bootstrap the producer kafka
// Kafka config needs to be set if the bulk API is used to bootstrap the producer kafka
// we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future
string bootstrap_servers = 9;
KafkaConfig kafka_config = 9;
bool use_bulk_api = 10;
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,18 @@ public void bootstrapCluster() throws Exception {
.setServerPort(8080)
.setServerAddress("localhost")
.build();
KaldbConfigs.KafkaConfig kafkaConfig =
KaldbConfigs.KafkaConfig.newBuilder()
.setKafkaBootStrapServers(kafkaServer.getBroker().getBrokerList().get())
.setKafkaTopic(DOWNSTREAM_TOPIC)
.build();
preprocessorConfig =
KaldbConfigs.PreprocessorConfig.newBuilder()
.setBootstrapServers(kafkaServer.getBroker().getBrokerList().get())
.setKafkaConfig(kafkaConfig)
.setUseBulkApi(true)
.setServerConfig(serverConfig)
.setPreprocessorInstanceCount(1)
.setRateLimiterMaxBurstSeconds(1)
.setDownstreamTopic(DOWNSTREAM_TOPIC)
.build();

datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,18 @@ public void bootstrapCluster() throws Exception {
.setServerPort(8080)
.setServerAddress("localhost")
.build();
KaldbConfigs.KafkaConfig kafkaConfig =
KaldbConfigs.KafkaConfig.newBuilder()
.setKafkaBootStrapServers(kafkaServer.getBroker().getBrokerList().get())
.setKafkaTopic(DOWNSTREAM_TOPIC)
.build();
preprocessorConfig =
KaldbConfigs.PreprocessorConfig.newBuilder()
.setBootstrapServers(kafkaServer.getBroker().getBrokerList().get())
.setKafkaConfig(kafkaConfig)
.setUseBulkApi(true)
.setServerConfig(serverConfig)
.setPreprocessorInstanceCount(1)
.setRateLimiterMaxBurstSeconds(1)
.setDownstreamTopic(DOWNSTREAM_TOPIC)
.build();

datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true);
Expand Down
12 changes: 10 additions & 2 deletions kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,11 @@ public void testParseKaldbJsonConfigFile() throws IOException {
assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log");
assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2);
assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false);
assertThat(preprocessorConfig.getBootstrapServers()).isEqualTo("localhost:9092");

final KaldbConfigs.KafkaConfig preprocessorKafkaConfig =
config.getPreprocessorConfig().getKafkaConfig();
assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092");
assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic");

final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig();
assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085);
Expand Down Expand Up @@ -466,8 +470,12 @@ public void testParseKaldbYamlConfigFile() throws IOException {
assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log");
assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2);

final KaldbConfigs.KafkaConfig preprocessorKafkaConfig =
config.getPreprocessorConfig().getKafkaConfig();
assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092");
assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic");

assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(true);
assertThat(preprocessorConfig.getBootstrapServers()).isEqualTo("localhost:9092");

final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig();
assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085);
Expand Down
4 changes: 4 additions & 0 deletions kaldb/src/test/resources/test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@
"numStreamThreads": 2,
"processingGuarantee": "at_least_once"
},
"kafkaConfig": {
"kafkaTopic": "test-topic",
"kafkaBootStrapServers": "localhost:9092"
},
"serverConfig": {
"serverPort": 8085,
"serverAddress": "localhost",
Expand Down
4 changes: 4 additions & 0 deletions kaldb/src/test/resources/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ preprocessorConfig:
applicationId: kaldb_preprocessor
numStreamThreads: 2
processingGuarantee: at_least_once
kafkaConfig:
kafkaTopic: test-topic
kafkaBootStrapServers: "localhost:9092"

serverConfig:
serverPort: 8085
serverAddress: localhost
Expand Down

0 comments on commit 3ad0a20

Please sign in to comment.