diff --git a/smallrye-reactive-messaging-kafka/revapi.json b/smallrye-reactive-messaging-kafka/revapi.json index d1a26bd769..32ecd19fc2 100644 --- a/smallrye-reactive-messaging-kafka/revapi.json +++ b/smallrye-reactive-messaging-kafka/revapi.json @@ -24,7 +24,17 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "ignore": true, + "code": "java.annotation.attributeValueChanged", + "old": "class io.smallrye.reactive.messaging.kafka.KafkaConnector", + "new": "class io.smallrye.reactive.messaging.kafka.KafkaConnector", + "annotationType": "io.smallrye.reactive.messaging.annotations.ConnectorAttributes", + "attribute": "value", + "justification": "Added client-id-prefix to connector attributes" + } + ] } }, { "extension" : "revapi.reporter.json", @@ -43,4 +53,4 @@ "minCriticality" : "documented", "output" : "out" } -} ] \ No newline at end of file +} ] diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java index 5f919b242a..39637b0bc9 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java @@ -57,6 +57,7 @@ @ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true") @ConnectorAttribute(name = "cloud-events", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Enables (default) or disables the Cloud Event support. If enabled on an _incoming_ channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an _outgoing_, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.", defaultValue = "true") @ConnectorAttribute(name = "kafka-configuration", type = "string", direction = Direction.INCOMING_AND_OUTGOING, description = "Identifier of a CDI bean that provides the default Kafka consumer/producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.") +@ConnectorAttribute(name = "client-id-prefix", type = "string", direction = Direction.INCOMING_AND_OUTGOING, description = "Prefix for Kafka client `client.id` attribute. If defined configured or generated `client.id` will be prefixed with the given value.") @ConnectorAttribute(name = "topics", type = "string", direction = Direction.INCOMING, description = "A comma-separating list of topics to be consumed. Cannot be used with the `topic` or `pattern` properties") @ConnectorAttribute(name = "pattern", type = "boolean", direction = Direction.INCOMING, description = "Indicate that the `topic` property is a regular expression. Must be used with the `topic` property. Cannot be used with the `topics` property", defaultValue = "false") diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaConsumer.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaConsumer.java index f22e8a6431..34f64d83f4 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaConsumer.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaConsumer.java @@ -257,7 +257,7 @@ public Uni resetToLastCommittedPositions() { }); } - private Map getKafkaConsumerConfiguration(KafkaConnectorIncomingConfiguration configuration, + private static Map getKafkaConsumerConfiguration(KafkaConnectorIncomingConfiguration configuration, String consumerGroup, int index) { Map map = new HashMap<>(); JsonHelper.asJsonObject(configuration.config()) @@ -286,28 +286,22 @@ private Map getKafkaConsumerConfiguration(KafkaConnectorIncoming } // Consumer id generation: - // 1. If we don't have an index and no client id set in the config, add one - // 2. If we don't have an index and a client id set in the config, use it - // 3. If we have an index and no client id set in the config, add one suffixed with the index - // 4. If we have an index and a client id set in the config, suffix the index - - if (index == -1) { - map.computeIfAbsent(ConsumerConfig.CLIENT_ID_CONFIG, k -> { + // 1. If no client id set in the config, set it to channel name, the prefix default value is "kafka-consumer-", + // 1. If a client id set in the config, prefix with the default value "", + // In any case if consumer index is -1, suffix is "", otherwise, suffix the index. + + String suffix = index == -1 ? ("") : ("-" + index); + map.compute(ConsumerConfig.CLIENT_ID_CONFIG, (k, configured) -> { + if (configured == null) { + String prefix = configuration.getClientIdPrefix().orElse("kafka-consumer-"); // Case 1 - return "kafka-consumer-" + configuration.getChannel(); - }); - // Case 2 - nothing to do - } else { - String configuredClientId = (String) map.get(ConsumerConfig.CLIENT_ID_CONFIG); - if (configuredClientId == null) { - // Case 3 - configuredClientId = "kafka-consumer-" + configuration.getChannel() + "-" + index; + return prefix + configuration.getChannel() + suffix; } else { - // Case 4 - configuredClientId = configuredClientId + "-" + index; + String prefix = configuration.getClientIdPrefix().orElse(""); + // Case 2 + return prefix + configured + suffix; } - map.put(ConsumerConfig.CLIENT_ID_CONFIG, configuredClientId); - } + }); ConfigurationCleaner.cleanupConsumerConfiguration(map); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.java index 06c7af2a41..276d912400 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.java @@ -226,7 +226,17 @@ private static Map getKafkaProducerConfiguration(KafkaConnectorO map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, configuration.getKeySerializer()); } - map.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "kafka-producer-" + configuration.getChannel()); + // Producer id generation: + // 1. If no client id set in the config, the default prefix is "kafka-producer-" + // 2. If a client id set in the config, the default prefix is "" + + map.compute(ProducerConfig.CLIENT_ID_CONFIG, (k, configured) -> { + if (configured == null) { + return configuration.getClientIdPrefix().orElse("kafka-producer-") + configuration.getChannel(); + } else { + return configuration.getClientIdPrefix().orElse("") + configured; + } + }); if (!map.containsKey(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG)) { // If no backoff is set, use 10s, it avoids high load on disconnection. diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java index 2fafb66011..d1eb000bbc 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java @@ -11,8 +11,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -30,20 +28,6 @@ // TODO should not extend ClientTestBase, it uses KafkaBrokerExtension which creates a broker for tests public class BrokerRestartTest extends ClientTestBase { - @BeforeEach - public void init() { - String newTopic = "test-" + UUID.randomUUID(); - companion.topics().createAndWait(newTopic, partitions); - this.topic = newTopic; - resetMessages(); - } - - @AfterEach - public void tearDown() { - cancelSubscriptions(); - source.closeQuietly(); - } - @Test public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() throws Exception { try (StrimziKafkaContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java index 6f5ce3ecd2..d10d41a541 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java @@ -10,7 +10,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -31,7 +33,9 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.subscription.Cancellable; @@ -45,6 +49,7 @@ import io.smallrye.reactive.messaging.kafka.base.SingletonInstance; import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner; +import io.smallrye.reactive.messaging.kafka.impl.KafkaSink; import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -66,6 +71,32 @@ public class ClientTestBase extends KafkaCompanionTestBase { final List subscriptions = new ArrayList<>(); KafkaSource source; + protected Queue sinks; + + protected Queue> sources; + + @AfterEach + public void tearDown() { + cancelSubscriptions(); + for (KafkaSource source : sources) { + source.closeQuietly(); + } + for (KafkaSink sink : sinks) { + sink.closeQuietly(); + } + } + + @BeforeEach + public void init() { + sources = new ConcurrentLinkedQueue<>(); + sinks = new ConcurrentLinkedQueue<>(); + + String newTopic = "test-" + UUID.randomUUID(); + companion.topics().createAndWait(newTopic, partitions); + this.topic = newTopic; + resetMessages(); + } + private long committedCount(ReactiveKafkaConsumer client) { TopicPartition[] tps = IntStream.range(0, partitions) .mapToObj(i -> new TopicPartition(topic, i)).distinct().toArray(TopicPartition[]::new); @@ -102,6 +133,7 @@ public KafkaSource createSource(MapBasedConfig config, String g source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); + sources.add(source); return source; } @@ -116,6 +148,7 @@ public KafkaSource createSourceSeekToBeginning() { source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); + sources.add(source); return source; } @@ -130,6 +163,7 @@ public KafkaSource createSourceSeekToEnd() { source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); + sources.add(source); return source; } @@ -144,6 +178,15 @@ public KafkaSource createSourceSeekToOffset() { source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); + sources.add(source); + return source; + } + + public KafkaSource createSource(MapBasedConfig config, int index) { + source = new KafkaSource<>(vertx, "groupId", new KafkaConnectorIncomingConfiguration(config), + commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), index); + sources.add(source); return source; } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/PauseResumeBatchTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/PauseResumeBatchTest.java index 4a8a0f1a3b..cd0e79e0cf 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/PauseResumeBatchTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/PauseResumeBatchTest.java @@ -60,10 +60,10 @@ private KafkaMapBasedConfig commonConfig() { @Test public void testWithAutoCommitMultiplePartitions() { MyConsumerUsingNoAck application = runApplication(commonConfig() - .put("enable.auto.commit", true) - .put("max.poll.records", 100) - .put("requests", partitions) - .put("auto.commit.interval.ms", 200), + .put("enable.auto.commit", true) + .put("max.poll.records", 100) + .put("requests", partitions) + .put("auto.commit.interval.ms", 200), MyConsumerUsingNoAck.class); long start = System.currentTimeMillis(); await() diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java index ccfbc949c5..a64e29a404 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java @@ -3,8 +3,6 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import io.smallrye.mutiny.Multi; @@ -17,20 +15,6 @@ public class ReactiveKafkaBatchConsumerTest extends ClientTestBase { - @AfterEach - public void tearDown() { - cancelSubscriptions(); - source.closeQuietly(); - } - - @BeforeEach - public void init() { - String newTopic = "test-" + UUID.randomUUID(); - companion.topics().createAndWait(newTopic, partitions); - this.topic = newTopic; - resetMessages(); - } - @Test public void testReception() throws Exception { Multi> stream = createSource().getBatchStream(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaConsumerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaConsumerTest.java index 5e0f35e3d2..5a6016c31c 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaConsumerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaConsumerTest.java @@ -14,6 +14,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.*; import io.smallrye.mutiny.Multi; @@ -25,6 +26,7 @@ import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener; import io.smallrye.reactive.messaging.kafka.TestTags; +import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; import io.smallrye.reactive.messaging.kafka.base.SingletonInstance; import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; @@ -32,18 +34,35 @@ public class ReactiveKafkaConsumerTest extends ClientTestBase { - @AfterEach - public void tearDown() { - cancelSubscriptions(); - source.closeQuietly(); - } - - @BeforeEach - public void init() { - String newTopic = "test-" + UUID.randomUUID(); - companion.topics().createAndWait(newTopic, partitions); - this.topic = newTopic; - resetMessages(); + @Test + void testConfigClientIdPrefix() { + KafkaMapBasedConfig config = kafkaConfig() + .put("value.deserializer", StringDeserializer.class.getName()) + .put("channel-name", "test"); + source = createSource(config, -1); + String clientId = source.getConsumer().get(ConsumerConfig.CLIENT_ID_CONFIG); + assertThat(clientId).isEqualTo("kafka-consumer-test"); + + source = createSource(config, 1); + clientId = source.getConsumer().get(ConsumerConfig.CLIENT_ID_CONFIG); + assertThat(clientId).isEqualTo("kafka-consumer-test-1"); + + config.put("client.id", "custom-client-id"); + source = createSource(config, 2); + clientId = source.getConsumer().get(ConsumerConfig.CLIENT_ID_CONFIG); + assertThat(clientId).isEqualTo("custom-client-id-2"); + + config.put("client-id-prefix", "my-client-"); + config.remove("client.id"); + source = createSource(config, -1); + clientId = source.getConsumer().get(ConsumerConfig.CLIENT_ID_CONFIG); + assertThat(clientId).isEqualTo("my-client-test"); + + config.put("client.id", "custom-client-id"); + config.put("client-id-prefix", "my-client-"); + source = createSource(config, 2); + clientId = source.getConsumer().get(ConsumerConfig.CLIENT_ID_CONFIG); + assertThat(clientId).isEqualTo("my-client-custom-client-id-2"); } @Test diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java index baf414cf6c..c1532010aa 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java @@ -6,15 +6,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; -import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadLocalRandom; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.eclipse.microprofile.reactive.messaging.Message; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscriber; @@ -29,22 +26,31 @@ import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; public class ReactiveKafkaProducerTest extends ClientTestBase { - private Queue sinks; - - @BeforeEach - public void init() { - sinks = new ConcurrentLinkedQueue<>(); - String newTopic = "test-" + UUID.randomUUID(); - companion.topics().createAndWait(newTopic, partitions); - this.topic = newTopic; - resetMessages(); - } - @AfterEach - public void tearDown() { - for (KafkaSink sink : sinks) { - sink.closeQuietly(); - } + @Test + void testConfigClientIdPrefix() { + MapBasedConfig producerConfig = createProducerConfig() + .put("channel-name", "test"); + KafkaSink sink = createSink(producerConfig); + String clientId = (String) sink.getProducer().configuration().get(ProducerConfig.CLIENT_ID_CONFIG); + assertThat(clientId).isEqualTo("kafka-producer-test"); + + producerConfig.put("client.id", "my-producer"); + sink = createSink(producerConfig); + clientId = (String) sink.getProducer().configuration().get(ProducerConfig.CLIENT_ID_CONFIG); + assertThat(clientId).isEqualTo("my-producer"); + + producerConfig.put("client-id-prefix", "my-custom-"); + producerConfig.remove("client.id"); + sink = createSink(producerConfig); + clientId = (String) sink.getProducer().configuration().get(ProducerConfig.CLIENT_ID_CONFIG); + assertThat(clientId).isEqualTo("my-custom-test"); + + producerConfig.put("client.id", "my-producer"); + producerConfig.put("client-id-prefix", "my-custom-"); + sink = createSink(producerConfig); + clientId = (String) sink.getProducer().configuration().get(ProducerConfig.CLIENT_ID_CONFIG); + assertThat(clientId).isEqualTo("my-custom-my-producer"); } @Test @@ -164,6 +170,13 @@ public KafkaSink createSink() { return sink; } + public KafkaSink createSink(MapBasedConfig config) { + KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), + CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + this.sinks.add(sink); + return sink; + } + public KafkaSink createTransactionalSink() { String channelName = "test-" + ThreadLocalRandom.current().nextInt(); MapBasedConfig config = createProducerConfig()