From 3ad0a20f95251fd51e336dc2dd9931fcc3da3fa9 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder Date: Thu, 21 Dec 2023 11:28:54 -0700 Subject: [PATCH] Add support for additional kafka props on bulk ingest --- config/config.yaml | 5 +- .../BulkIngestKafkaProducer.java | 58 ++++++++++++++----- .../preprocessor/PreprocessorService.java | 6 +- .../com/slack/kaldb/writer/KafkaUtils.java | 35 +++++++++++ .../writer/kafka/KaldbKafkaConsumer.java | 37 +++--------- kaldb/src/main/proto/kaldb_configs.proto | 4 +- .../BulkIngestKafkaProducerTest.java | 8 ++- .../slack/kaldb/server/BulkIngestApiTest.java | 8 ++- .../slack/kaldb/server/KaldbConfigTest.java | 12 +++- kaldb/src/test/resources/test_config.json | 4 ++ kaldb/src/test/resources/test_config.yaml | 4 ++ 11 files changed, 124 insertions(+), 57 deletions(-) create mode 100644 kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java diff --git a/config/config.yaml b/config/config.yaml index 9835654318..6c95431fd1 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -125,6 +125,10 @@ preprocessorConfig: numStreamThreads: ${KAFKA_STREAM_THREADS:-2} processingGuarantee: ${KAFKA_STREAM_PROCESSING_GUARANTEE:-at_least_once} additionalProps: ${KAFKA_ADDITIONAL_PROPS:-} + kafkaConfig: + kafkaTopic: ${KAFKA_TOPIC:-test-topic} + kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} + additionalProps: ${KAFKA_ADDITIONAL_PROPS:-} serverConfig: serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086} @@ -136,5 +140,4 @@ preprocessorConfig: dataTransformer: ${PREPROCESSOR_TRANSFORMER:-json} rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1} kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0} - bootstrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} useBulkApi: ${KALDB_PREPROCESSOR_USE_BULK_API:-false} diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java index b6d1c48c59..ed74e474fc 100644 --- a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java @@ -11,6 +11,7 @@ import com.slack.kaldb.preprocessor.PreprocessorService; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.util.RuntimeHalterImpl; +import com.slack.kaldb.writer.KafkaUtils; import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; @@ -21,12 +22,14 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; @@ -41,7 +44,7 @@ public class BulkIngestKafkaProducer extends AbstractExecutionThreadService { private final KafkaProducer kafkaProducer; private final KafkaClientMetrics kafkaMetrics; - private final KaldbConfigs.PreprocessorConfig preprocessorConfig; + private final KaldbConfigs.KafkaConfig kafkaConfig; private final DatasetMetadataStore datasetMetadataStore; private final KaldbMetadataStoreChangeListener datasetListener = @@ -64,19 +67,28 @@ public class BulkIngestKafkaProducer extends AbstractExecutionThreadService { public static final String BATCH_SIZE_GAUGE = "bulk_ingest_producer_batch_size"; private final AtomicInteger batchSizeGauge; + private static final Set OVERRIDABLE_CONFIGS = + Set.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + ProducerConfig.LINGER_MS_CONFIG, + ProducerConfig.BATCH_SIZE_CONFIG, + ProducerConfig.MAX_BLOCK_MS_CONFIG, + ProducerConfig.COMPRESSION_TYPE_CONFIG); + public BulkIngestKafkaProducer( final DatasetMetadataStore datasetMetadataStore, final KaldbConfigs.PreprocessorConfig preprocessorConfig, final PrometheusMeterRegistry meterRegistry) { + + this.kafkaConfig = preprocessorConfig.getKafkaConfig(); + checkArgument( - !preprocessorConfig.getBootstrapServers().isEmpty(), + !kafkaConfig.getKafkaBootStrapServers().isEmpty(), "Kafka bootstrapServers must be provided"); - checkArgument( - !preprocessorConfig.getDownstreamTopic().isEmpty(), - "Kafka downstreamTopic must be provided"); + checkArgument(!kafkaConfig.getKafkaTopic().isEmpty(), "Kafka topic must be provided"); - this.preprocessorConfig = preprocessorConfig; this.datasetMetadataStore = datasetMetadataStore; // todo - consider making these a configurable value, or determine a way to derive a reasonable @@ -239,8 +251,7 @@ private BulkIngestResponse produceDocuments(Map> indexD // we will limit producing documents 1 thread at a time for (Trace.Span doc : indexDoc.getValue()) { ProducerRecord producerRecord = - new ProducerRecord<>( - preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray()); + new ProducerRecord<>(kafkaConfig.getKafkaTopic(), partition, index, doc.toByteArray()); // we intentionally supress FutureReturnValueIgnored here in errorprone - this is because // we wrap this in a transaction, which is responsible for flushing all of the pending @@ -254,14 +265,29 @@ private BulkIngestResponse produceDocuments(Map> indexD private KafkaProducer createKafkaTransactionProducer(String transactionId) { Properties props = new Properties(); - props.put("bootstrap.servers", preprocessorConfig.getBootstrapServers()); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put("transactional.id", transactionId); - props.put("linger.ms", 500); - props.put("batch.size", 128000); - props.put("max.block.ms", 10000); - props.put("compression.type", "snappy"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaBootStrapServers()); + props.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + props.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId); + props.put(ProducerConfig.LINGER_MS_CONFIG, 500); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128000); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); + + // don't override the properties that we have already set explicitly using named properties + for (Map.Entry additionalProp : + kafkaConfig.getAdditionalPropsMap().entrySet()) { + props = + KafkaUtils.maybeOverrideProps( + props, + additionalProp.getKey(), + additionalProp.getValue(), + OVERRIDABLE_CONFIGS.contains(additionalProp.getKey())); + } return new KafkaProducer<>(props); } diff --git a/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java b/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java index 1e87100fbd..5351ec9b01 100644 --- a/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java +++ b/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java @@ -2,7 +2,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; -import static com.slack.kaldb.writer.kafka.KaldbKafkaConsumer.maybeOverride; import com.google.common.util.concurrent.AbstractService; import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener; @@ -10,6 +9,7 @@ import com.slack.kaldb.metadata.dataset.DatasetMetadataStore; import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.writer.KafkaUtils; import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; @@ -313,7 +313,9 @@ protected static Properties makeKafkaStreamsProps( // don't override any property we already set for (Map.Entry additionalProp : kafkaStreamConfig.getAdditionalPropsMap().entrySet()) { - maybeOverride(props, additionalProp.getKey(), additionalProp.getValue(), false); + props = + KafkaUtils.maybeOverrideProps( + props, additionalProp.getKey(), additionalProp.getValue(), false); } return props; diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java b/kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java new file mode 100644 index 0000000000..e235f4a73f --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java @@ -0,0 +1,35 @@ +package com.slack.kaldb.writer; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Shared kafka functions for producers, consumers, and stream applications */ +public class KafkaUtils { + private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); + + @VisibleForTesting + public static Properties maybeOverrideProps( + Properties inputProps, String key, String value, boolean override) { + Properties changedProps = new Properties(inputProps); + String userValue = inputProps.getProperty(key); + if (userValue != null) { + if (override) { + LOG.warn( + String.format( + "Property %s is provided but will be overridden from %s to %s", + key, userValue, value)); + inputProps.setProperty(key, value); + } else { + LOG.warn( + String.format( + "Property %s is provided but won't be overridden from %s to %s", + key, userValue, value)); + } + } else { + inputProps.setProperty(key, value); + } + return changedProps; + } +} diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java b/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java index 368cd158c7..7e151df9d5 100644 --- a/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java +++ b/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java @@ -9,6 +9,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.server.KaldbConfig; +import com.slack.kaldb.writer.KafkaUtils; import com.slack.kaldb.writer.LogMessageWriterImpl; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; @@ -82,40 +83,16 @@ public static Properties makeKafkaConsumerProps(KaldbConfigs.KafkaConfig kafkaCo // don't override the properties that we have already set explicitly using named properties for (Map.Entry additionalProp : kafkaConfig.getAdditionalPropsMap().entrySet()) { - maybeOverride( - props, - additionalProp.getKey(), - additionalProp.getValue(), - OVERRIDABLE_CONFIGS.contains(additionalProp.getKey())); + props = + KafkaUtils.maybeOverrideProps( + props, + additionalProp.getKey(), + additionalProp.getValue(), + OVERRIDABLE_CONFIGS.contains(additionalProp.getKey())); } return props; } - @VisibleForTesting - public static boolean maybeOverride( - Properties props, String key, String value, boolean override) { - boolean overridden = false; - String userValue = props.getProperty(key); - if (userValue != null) { - if (override) { - LOG.warn( - String.format( - "Property %s is provided but will be overridden from %s to %s", - key, userValue, value)); - props.setProperty(key, value); - overridden = true; - } else { - LOG.warn( - String.format( - "Property %s is provided but won't be overridden from %s to %s", - key, userValue, value)); - } - } else { - props.setProperty(key, value); - } - return overridden; - } - private KafkaConsumer kafkaConsumer; private final TopicPartition topicPartition; diff --git a/kaldb/src/main/proto/kaldb_configs.proto b/kaldb/src/main/proto/kaldb_configs.proto index 17b1656939..21ac233489 100644 --- a/kaldb/src/main/proto/kaldb_configs.proto +++ b/kaldb/src/main/proto/kaldb_configs.proto @@ -266,8 +266,8 @@ message PreprocessorConfig { // more docs on PreprocessorPartitioner#getDatasetPartitionSuppliers int32 kafka_partition_sticky_timeout_ms = 8; - // this value needs to be set if the bulk API is used to bootstrap the producer kafka + // Kafka config needs to be set if the bulk API is used to bootstrap the producer kafka // we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future - string bootstrap_servers = 9; + KafkaConfig kafka_config = 9; bool use_bulk_api = 10; } diff --git a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducerTest.java b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducerTest.java index bf9238af76..a1ba185a47 100644 --- a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducerTest.java @@ -73,14 +73,18 @@ public void bootstrapCluster() throws Exception { .setServerPort(8080) .setServerAddress("localhost") .build(); + KaldbConfigs.KafkaConfig kafkaConfig = + KaldbConfigs.KafkaConfig.newBuilder() + .setKafkaBootStrapServers(kafkaServer.getBroker().getBrokerList().get()) + .setKafkaTopic(DOWNSTREAM_TOPIC) + .build(); preprocessorConfig = KaldbConfigs.PreprocessorConfig.newBuilder() - .setBootstrapServers(kafkaServer.getBroker().getBrokerList().get()) + .setKafkaConfig(kafkaConfig) .setUseBulkApi(true) .setServerConfig(serverConfig) .setPreprocessorInstanceCount(1) .setRateLimiterMaxBurstSeconds(1) - .setDownstreamTopic(DOWNSTREAM_TOPIC) .build(); datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); diff --git a/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java b/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java index 2126569aa7..c112d35bc0 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java @@ -86,14 +86,18 @@ public void bootstrapCluster() throws Exception { .setServerPort(8080) .setServerAddress("localhost") .build(); + KaldbConfigs.KafkaConfig kafkaConfig = + KaldbConfigs.KafkaConfig.newBuilder() + .setKafkaBootStrapServers(kafkaServer.getBroker().getBrokerList().get()) + .setKafkaTopic(DOWNSTREAM_TOPIC) + .build(); preprocessorConfig = KaldbConfigs.PreprocessorConfig.newBuilder() - .setBootstrapServers(kafkaServer.getBroker().getBrokerList().get()) + .setKafkaConfig(kafkaConfig) .setUseBulkApi(true) .setServerConfig(serverConfig) .setPreprocessorInstanceCount(1) .setRateLimiterMaxBurstSeconds(1) - .setDownstreamTopic(DOWNSTREAM_TOPIC) .build(); datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); diff --git a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java index 9f72902287..adb80f0fea 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java @@ -295,7 +295,11 @@ public void testParseKaldbJsonConfigFile() throws IOException { assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2); assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false); - assertThat(preprocessorConfig.getBootstrapServers()).isEqualTo("localhost:9092"); + + final KaldbConfigs.KafkaConfig preprocessorKafkaConfig = + config.getPreprocessorConfig().getKafkaConfig(); + assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092"); + assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic"); final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085); @@ -466,8 +470,12 @@ public void testParseKaldbYamlConfigFile() throws IOException { assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2); + final KaldbConfigs.KafkaConfig preprocessorKafkaConfig = + config.getPreprocessorConfig().getKafkaConfig(); + assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092"); + assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic"); + assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(true); - assertThat(preprocessorConfig.getBootstrapServers()).isEqualTo("localhost:9092"); final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085); diff --git a/kaldb/src/test/resources/test_config.json b/kaldb/src/test/resources/test_config.json index fa675fa2a8..1d1f4e6833 100644 --- a/kaldb/src/test/resources/test_config.json +++ b/kaldb/src/test/resources/test_config.json @@ -142,6 +142,10 @@ "numStreamThreads": 2, "processingGuarantee": "at_least_once" }, + "kafkaConfig": { + "kafkaTopic": "test-topic", + "kafkaBootStrapServers": "localhost:9092" + }, "serverConfig": { "serverPort": 8085, "serverAddress": "localhost", diff --git a/kaldb/src/test/resources/test_config.yaml b/kaldb/src/test/resources/test_config.yaml index 00c96a30da..e5c62cd740 100644 --- a/kaldb/src/test/resources/test_config.yaml +++ b/kaldb/src/test/resources/test_config.yaml @@ -114,6 +114,10 @@ preprocessorConfig: applicationId: kaldb_preprocessor numStreamThreads: 2 processingGuarantee: at_least_once + kafkaConfig: + kafkaTopic: test-topic + kafkaBootStrapServers: "localhost:9092" + serverConfig: serverPort: 8085 serverAddress: localhost