Skip to content

Commit

Permalink
Add support for additional kafka props on bulk ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Dec 21, 2023
1 parent e6b9e72 commit 3111f30
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 91 deletions.
5 changes: 4 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ preprocessorConfig:
numStreamThreads: ${KAFKA_STREAM_THREADS:-2}
processingGuarantee: ${KAFKA_STREAM_PROCESSING_GUARANTEE:-at_least_once}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}
kafkaConfig:
kafkaTopic: ${KAFKA_TOPIC:-test-topic}
kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}

serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
Expand All @@ -136,5 +140,4 @@ preprocessorConfig:
dataTransformer: ${PREPROCESSOR_TRANSFORMER:-json}
rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1}
kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
useBulkApi: ${KALDB_PREPROCESSOR_USE_BULK_API:-false}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.slack.kaldb.preprocessor.PreprocessorService;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.util.RuntimeHalterImpl;
import com.slack.kaldb.writer.KafkaUtils;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
Expand All @@ -21,12 +22,14 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
Expand All @@ -41,7 +44,7 @@ public class BulkIngestKafkaProducer extends AbstractExecutionThreadService {
private final KafkaProducer<String, byte[]> kafkaProducer;
private final KafkaClientMetrics kafkaMetrics;

private final KaldbConfigs.PreprocessorConfig preprocessorConfig;
private final KaldbConfigs.KafkaConfig kafkaConfig;

private final DatasetMetadataStore datasetMetadataStore;
private final KaldbMetadataStoreChangeListener<DatasetMetadata> datasetListener =
Expand All @@ -64,19 +67,28 @@ public class BulkIngestKafkaProducer extends AbstractExecutionThreadService {
public static final String BATCH_SIZE_GAUGE = "bulk_ingest_producer_batch_size";
private final AtomicInteger batchSizeGauge;

private static final Set<String> OVERRIDABLE_CONFIGS =
Set.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ProducerConfig.LINGER_MS_CONFIG,
ProducerConfig.BATCH_SIZE_CONFIG,
ProducerConfig.MAX_BLOCK_MS_CONFIG,
ProducerConfig.COMPRESSION_TYPE_CONFIG);

public BulkIngestKafkaProducer(
final DatasetMetadataStore datasetMetadataStore,
final KaldbConfigs.PreprocessorConfig preprocessorConfig,
final PrometheusMeterRegistry meterRegistry) {

this.kafkaConfig = preprocessorConfig.getKafkaConfig();

checkArgument(
!preprocessorConfig.getBootstrapServers().isEmpty(),
!kafkaConfig.getKafkaBootStrapServers().isEmpty(),
"Kafka bootstrapServers must be provided");

checkArgument(
!preprocessorConfig.getDownstreamTopic().isEmpty(),
"Kafka downstreamTopic must be provided");
checkArgument(!kafkaConfig.getKafkaTopic().isEmpty(), "Kafka topic must be provided");

this.preprocessorConfig = preprocessorConfig;
this.datasetMetadataStore = datasetMetadataStore;

// todo - consider making these a configurable value, or determine a way to derive a reasonable
Expand Down Expand Up @@ -239,8 +251,7 @@ private BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexD
// we will limit producing documents 1 thread at a time
for (Trace.Span doc : indexDoc.getValue()) {
ProducerRecord<String, byte[]> producerRecord =
new ProducerRecord<>(
preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray());
new ProducerRecord<>(kafkaConfig.getKafkaTopic(), partition, index, doc.toByteArray());

// we intentionally supress FutureReturnValueIgnored here in errorprone - this is because
// we wrap this in a transaction, which is responsible for flushing all of the pending
Expand All @@ -254,14 +265,29 @@ private BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexD

private KafkaProducer<String, byte[]> createKafkaTransactionProducer(String transactionId) {
Properties props = new Properties();
props.put("bootstrap.servers", preprocessorConfig.getBootstrapServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("transactional.id", transactionId);
props.put("linger.ms", 500);
props.put("batch.size", 128000);
props.put("max.block.ms", 10000);
props.put("compression.type", "snappy");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaBootStrapServers());
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128000);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

// don't override the properties that we have already set explicitly using named properties
for (Map.Entry<String, String> additionalProp :
kafkaConfig.getAdditionalPropsMap().entrySet()) {
props =
KafkaUtils.maybeOverrideProps(
props,
additionalProp.getKey(),
additionalProp.getValue(),
OVERRIDABLE_CONFIGS.contains(additionalProp.getKey()));
}
return new KafkaProducer<>(props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION;
import static com.slack.kaldb.writer.kafka.KaldbKafkaConsumer.maybeOverride;

import com.google.common.util.concurrent.AbstractService;
import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener;
import com.slack.kaldb.metadata.dataset.DatasetMetadata;
import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.writer.KafkaUtils;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
Expand Down Expand Up @@ -313,7 +313,9 @@ protected static Properties makeKafkaStreamsProps(
// don't override any property we already set
for (Map.Entry<String, String> additionalProp :
kafkaStreamConfig.getAdditionalPropsMap().entrySet()) {
maybeOverride(props, additionalProp.getKey(), additionalProp.getValue(), false);
props =
KafkaUtils.maybeOverrideProps(
props, additionalProp.getKey(), additionalProp.getValue(), false);
}

return props;
Expand Down
35 changes: 35 additions & 0 deletions kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.slack.kaldb.writer;

import com.google.common.annotations.VisibleForTesting;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Shared kafka functions for producers, consumers, and stream applications */
public class KafkaUtils {
private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);

@VisibleForTesting
public static Properties maybeOverrideProps(
Properties inputProps, String key, String value, boolean override) {
Properties changedProps = (Properties) inputProps.clone();
String userValue = changedProps.getProperty(key);
if (userValue != null) {
if (override) {
LOG.warn(
String.format(
"Property %s is provided but will be overridden from %s to %s",
key, userValue, value));
changedProps.setProperty(key, value);
} else {
LOG.warn(
String.format(
"Property %s is provided but won't be overridden from %s to %s",
key, userValue, value));
}
} else {
changedProps.setProperty(key, value);
}
return changedProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.server.KaldbConfig;
import com.slack.kaldb.writer.KafkaUtils;
import com.slack.kaldb.writer.LogMessageWriterImpl;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -82,40 +83,16 @@ public static Properties makeKafkaConsumerProps(KaldbConfigs.KafkaConfig kafkaCo
// don't override the properties that we have already set explicitly using named properties
for (Map.Entry<String, String> additionalProp :
kafkaConfig.getAdditionalPropsMap().entrySet()) {
maybeOverride(
props,
additionalProp.getKey(),
additionalProp.getValue(),
OVERRIDABLE_CONFIGS.contains(additionalProp.getKey()));
props =
KafkaUtils.maybeOverrideProps(
props,
additionalProp.getKey(),
additionalProp.getValue(),
OVERRIDABLE_CONFIGS.contains(additionalProp.getKey()));
}
return props;
}

@VisibleForTesting
public static boolean maybeOverride(
Properties props, String key, String value, boolean override) {
boolean overridden = false;
String userValue = props.getProperty(key);
if (userValue != null) {
if (override) {
LOG.warn(
String.format(
"Property %s is provided but will be overridden from %s to %s",
key, userValue, value));
props.setProperty(key, value);
overridden = true;
} else {
LOG.warn(
String.format(
"Property %s is provided but won't be overridden from %s to %s",
key, userValue, value));
}
} else {
props.setProperty(key, value);
}
return overridden;
}

private KafkaConsumer<String, byte[]> kafkaConsumer;
private final TopicPartition topicPartition;

Expand Down
4 changes: 2 additions & 2 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ message PreprocessorConfig {
// more docs on PreprocessorPartitioner#getDatasetPartitionSuppliers
int32 kafka_partition_sticky_timeout_ms = 8;

// this value needs to be set if the bulk API is used to bootstrap the producer kafka
// Kafka config needs to be set if the bulk API is used to bootstrap the producer kafka
// we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future
string bootstrap_servers = 9;
KafkaConfig kafka_config = 9;
bool use_bulk_api = 10;
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,18 @@ public void bootstrapCluster() throws Exception {
.setServerPort(8080)
.setServerAddress("localhost")
.build();
KaldbConfigs.KafkaConfig kafkaConfig =
KaldbConfigs.KafkaConfig.newBuilder()
.setKafkaBootStrapServers(kafkaServer.getBroker().getBrokerList().get())
.setKafkaTopic(DOWNSTREAM_TOPIC)
.build();
preprocessorConfig =
KaldbConfigs.PreprocessorConfig.newBuilder()
.setBootstrapServers(kafkaServer.getBroker().getBrokerList().get())
.setKafkaConfig(kafkaConfig)
.setUseBulkApi(true)
.setServerConfig(serverConfig)
.setPreprocessorInstanceCount(1)
.setRateLimiterMaxBurstSeconds(1)
.setDownstreamTopic(DOWNSTREAM_TOPIC)
.build();

datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,18 @@ public void bootstrapCluster() throws Exception {
.setServerPort(8080)
.setServerAddress("localhost")
.build();
KaldbConfigs.KafkaConfig kafkaConfig =
KaldbConfigs.KafkaConfig.newBuilder()
.setKafkaBootStrapServers(kafkaServer.getBroker().getBrokerList().get())
.setKafkaTopic(DOWNSTREAM_TOPIC)
.build();
preprocessorConfig =
KaldbConfigs.PreprocessorConfig.newBuilder()
.setBootstrapServers(kafkaServer.getBroker().getBrokerList().get())
.setKafkaConfig(kafkaConfig)
.setUseBulkApi(true)
.setServerConfig(serverConfig)
.setPreprocessorInstanceCount(1)
.setRateLimiterMaxBurstSeconds(1)
.setDownstreamTopic(DOWNSTREAM_TOPIC)
.build();

datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true);
Expand Down
12 changes: 10 additions & 2 deletions kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,11 @@ public void testParseKaldbJsonConfigFile() throws IOException {
assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log");
assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2);
assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false);
assertThat(preprocessorConfig.getBootstrapServers()).isEqualTo("localhost:9092");

final KaldbConfigs.KafkaConfig preprocessorKafkaConfig =
config.getPreprocessorConfig().getKafkaConfig();
assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092");
assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic");

final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig();
assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085);
Expand Down Expand Up @@ -466,8 +470,12 @@ public void testParseKaldbYamlConfigFile() throws IOException {
assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log");
assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2);

final KaldbConfigs.KafkaConfig preprocessorKafkaConfig =
config.getPreprocessorConfig().getKafkaConfig();
assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092");
assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic");

assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(true);
assertThat(preprocessorConfig.getBootstrapServers()).isEqualTo("localhost:9092");

final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig();
assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085);
Expand Down
48 changes: 48 additions & 0 deletions kaldb/src/test/java/com/slack/kaldb/writer/KafkaUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.slack.kaldb.writer;

import static org.assertj.core.api.Assertions.assertThat;

import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.testlib.TestKafkaServer;
import com.slack.kaldb.writer.kafka.KaldbKafkaConsumer;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.jupiter.api.Test;

class KafkaUtilsTest {
public static final String TEST_KAFKA_CLIENT_GROUP = "test_kaldb_consumer";

@Test
public void testOverridingProperties() {
KaldbConfigs.KafkaConfig kafkaConfig =
KaldbConfigs.KafkaConfig.newBuilder()
.setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC)
.setKafkaTopicPartition("0")
.setKafkaBootStrapServers("bootstrap_server")
.setKafkaClientGroup(TEST_KAFKA_CLIENT_GROUP)
.setEnableKafkaAutoCommit("true")
.setKafkaAutoCommitInterval("5000")
.setKafkaSessionTimeout("5000")
.build();

Properties properties = KaldbKafkaConsumer.makeKafkaConsumerProps(kafkaConfig);
assertThat(properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
.isEqualTo("org.apache.kafka.common.serialization.StringDeserializer");

kafkaConfig =
KaldbConfigs.KafkaConfig.newBuilder()
.setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC)
.setKafkaTopicPartition("0")
.setKafkaBootStrapServers("bootstrap_server")
.setKafkaClientGroup(TEST_KAFKA_CLIENT_GROUP)
.setEnableKafkaAutoCommit("true")
.setKafkaAutoCommitInterval("5000")
.setKafkaSessionTimeout("5000")
.putAdditionalProps(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "test_serializer")
.build();

properties = KaldbKafkaConsumer.makeKafkaConsumerProps(kafkaConfig);
assertThat(properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
.isEqualTo("test_serializer");
}
}
Loading

0 comments on commit 3111f30

Please sign in to comment.