Skip to content

Commit

Permalink
Merge aa27211 into 7cfe427
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp authored Sep 21, 2022
2 parents 7cfe427 + aa27211 commit 661d39f
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 89 deletions.
14 changes: 12 additions & 2 deletions smallrye-reactive-messaging-kafka/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.annotation.attributeValueChanged",
"old": "class io.smallrye.reactive.messaging.kafka.KafkaConnector",
"new": "class io.smallrye.reactive.messaging.kafka.KafkaConnector",
"annotationType": "io.smallrye.reactive.messaging.annotations.ConnectorAttributes",
"attribute": "value",
"justification": "Added client-id-prefix to connector attributes"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -43,4 +53,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")
@ConnectorAttribute(name = "cloud-events", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Enables (default) or disables the Cloud Event support. If enabled on an _incoming_ channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an _outgoing_, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.", defaultValue = "true")
@ConnectorAttribute(name = "kafka-configuration", type = "string", direction = Direction.INCOMING_AND_OUTGOING, description = "Identifier of a CDI bean that provides the default Kafka consumer/producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.")
@ConnectorAttribute(name = "client-id-prefix", type = "string", direction = Direction.INCOMING_AND_OUTGOING, description = "Prefix for Kafka client `client.id` attribute. If defined configured or generated `client.id` will be prefixed with the given value.")

@ConnectorAttribute(name = "topics", type = "string", direction = Direction.INCOMING, description = "A comma-separating list of topics to be consumed. Cannot be used with the `topic` or `pattern` properties")
@ConnectorAttribute(name = "pattern", type = "boolean", direction = Direction.INCOMING, description = "Indicate that the `topic` property is a regular expression. Must be used with the `topic` property. Cannot be used with the `topics` property", defaultValue = "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public Uni<Void> resetToLastCommittedPositions() {
});
}

private Map<String, Object> getKafkaConsumerConfiguration(KafkaConnectorIncomingConfiguration configuration,
private static Map<String, Object> getKafkaConsumerConfiguration(KafkaConnectorIncomingConfiguration configuration,
String consumerGroup, int index) {
Map<String, Object> map = new HashMap<>();
JsonHelper.asJsonObject(configuration.config())
Expand Down Expand Up @@ -286,28 +286,22 @@ private Map<String, Object> getKafkaConsumerConfiguration(KafkaConnectorIncoming
}

// Consumer id generation:
// 1. If we don't have an index and no client id set in the config, add one
// 2. If we don't have an index and a client id set in the config, use it
// 3. If we have an index and no client id set in the config, add one suffixed with the index
// 4. If we have an index and a client id set in the config, suffix the index

if (index == -1) {
map.computeIfAbsent(ConsumerConfig.CLIENT_ID_CONFIG, k -> {
// 1. If no client id set in the config, set it to channel name, the prefix default value is "kafka-consumer-",
// 1. If a client id set in the config, prefix with the default value "",
// In any case if consumer index is -1, suffix is "", otherwise, suffix the index.

String suffix = index == -1 ? ("") : ("-" + index);
map.compute(ConsumerConfig.CLIENT_ID_CONFIG, (k, configured) -> {
if (configured == null) {
String prefix = configuration.getClientIdPrefix().orElse("kafka-consumer-");
// Case 1
return "kafka-consumer-" + configuration.getChannel();
});
// Case 2 - nothing to do
} else {
String configuredClientId = (String) map.get(ConsumerConfig.CLIENT_ID_CONFIG);
if (configuredClientId == null) {
// Case 3
configuredClientId = "kafka-consumer-" + configuration.getChannel() + "-" + index;
return prefix + configuration.getChannel() + suffix;
} else {
// Case 4
configuredClientId = configuredClientId + "-" + index;
String prefix = configuration.getClientIdPrefix().orElse("");
// Case 2
return prefix + configured + suffix;
}
map.put(ConsumerConfig.CLIENT_ID_CONFIG, configuredClientId);
}
});

ConfigurationCleaner.cleanupConsumerConfiguration(map);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,17 @@ private static Map<String, Object> getKafkaProducerConfiguration(KafkaConnectorO
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, configuration.getKeySerializer());
}

map.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "kafka-producer-" + configuration.getChannel());
// Producer id generation:
// 1. If no client id set in the config, the default prefix is "kafka-producer-"
// 2. If a client id set in the config, the default prefix is ""

map.compute(ProducerConfig.CLIENT_ID_CONFIG, (k, configured) -> {
if (configured == null) {
return configuration.getClientIdPrefix().orElse("kafka-producer-") + configuration.getChannel();
} else {
return configuration.getClientIdPrefix().orElse("") + configured;
}
});

if (!map.containsKey(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG)) {
// If no backoff is set, use 10s, it avoids high load on disconnection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

Expand All @@ -30,20 +28,6 @@
// TODO should not extend ClientTestBase, it uses KafkaBrokerExtension which creates a broker for tests
public class BrokerRestartTest extends ClientTestBase {

@BeforeEach
public void init() {
String newTopic = "test-" + UUID.randomUUID();
companion.topics().createAndWait(newTopic, partitions);
this.topic = newTopic;
resetMessages();
}

@AfterEach
public void tearDown() {
cancelSubscriptions();
source.closeQuietly();
}

@Test
public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() throws Exception {
try (StrimziKafkaContainer kafka = KafkaBrokerExtension.createKafkaContainer()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
Expand All @@ -31,7 +33,9 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.Cancellable;
Expand All @@ -45,6 +49,7 @@
import io.smallrye.reactive.messaging.kafka.base.SingletonInstance;
import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance;
import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSink;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
Expand All @@ -66,6 +71,32 @@ public class ClientTestBase extends KafkaCompanionTestBase {
final List<Cancellable> subscriptions = new ArrayList<>();
KafkaSource<Integer, String> source;

protected Queue<KafkaSink> sinks;

protected Queue<KafkaSource<Integer, String>> sources;

@AfterEach
public void tearDown() {
cancelSubscriptions();
for (KafkaSource<Integer, String> source : sources) {
source.closeQuietly();
}
for (KafkaSink sink : sinks) {
sink.closeQuietly();
}
}

@BeforeEach
public void init() {
sources = new ConcurrentLinkedQueue<>();
sinks = new ConcurrentLinkedQueue<>();

String newTopic = "test-" + UUID.randomUUID();
companion.topics().createAndWait(newTopic, partitions);
this.topic = newTopic;
resetMessages();
}

private long committedCount(ReactiveKafkaConsumer<Integer, String> client) {
TopicPartition[] tps = IntStream.range(0, partitions)
.mapToObj(i -> new TopicPartition(topic, i)).distinct().toArray(TopicPartition[]::new);
Expand Down Expand Up @@ -102,6 +133,7 @@ public KafkaSource<Integer, String> createSource(MapBasedConfig config, String g
source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories,
failureHandlerFactories,
listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0);
sources.add(source);
return source;
}

Expand All @@ -116,6 +148,7 @@ public KafkaSource<Integer, String> createSourceSeekToBeginning() {
source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories,
failureHandlerFactories,
listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0);
sources.add(source);
return source;
}

Expand All @@ -130,6 +163,7 @@ public KafkaSource<Integer, String> createSourceSeekToEnd() {
source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config),
commitHandlerFactories, failureHandlerFactories,
listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0);
sources.add(source);
return source;
}

Expand All @@ -144,6 +178,15 @@ public KafkaSource<Integer, String> createSourceSeekToOffset() {
source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config),
commitHandlerFactories, failureHandlerFactories,
listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0);
sources.add(source);
return source;
}

public KafkaSource<Integer, String> createSource(MapBasedConfig config, int index) {
source = new KafkaSource<>(vertx, "groupId", new KafkaConnectorIncomingConfiguration(config),
commitHandlerFactories, failureHandlerFactories,
UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), index);
sources.add(source);
return source;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ private KafkaMapBasedConfig commonConfig() {
@Test
public void testWithAutoCommitMultiplePartitions() {
MyConsumerUsingNoAck application = runApplication(commonConfig()
.put("enable.auto.commit", true)
.put("max.poll.records", 100)
.put("requests", partitions)
.put("auto.commit.interval.ms", 200),
.put("enable.auto.commit", true)
.put("max.poll.records", 100)
.put("requests", partitions)
.put("auto.commit.interval.ms", 200),
MyConsumerUsingNoAck.class);
long start = System.currentTimeMillis();
await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Multi;
Expand All @@ -17,20 +15,6 @@

public class ReactiveKafkaBatchConsumerTest extends ClientTestBase {

@AfterEach
public void tearDown() {
cancelSubscriptions();
source.closeQuietly();
}

@BeforeEach
public void init() {
String newTopic = "test-" + UUID.randomUUID();
companion.topics().createAndWait(newTopic, partitions);
this.topic = newTopic;
resetMessages();
}

@Test
public void testReception() throws Exception {
Multi<IncomingKafkaRecordBatch<Integer, String>> stream = createSource().getBatchStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.*;

import io.smallrye.mutiny.Multi;
Expand All @@ -25,25 +26,43 @@
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.TestTags;
import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig;
import io.smallrye.reactive.messaging.kafka.base.SingletonInstance;
import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class ReactiveKafkaConsumerTest extends ClientTestBase {

@AfterEach
public void tearDown() {
cancelSubscriptions();
source.closeQuietly();
}

@BeforeEach
public void init() {
String newTopic = "test-" + UUID.randomUUID();
companion.topics().createAndWait(newTopic, partitions);
this.topic = newTopic;
resetMessages();
@Test
void testConfigClientIdPrefix() {
KafkaMapBasedConfig config = kafkaConfig()
.put("value.deserializer", StringDeserializer.class.getName())
.put("channel-name", "test");
source = createSource(config, -1);
String clientId = source.getConsumer().get(ConsumerConfig.CLIENT_ID_CONFIG);
assertThat(clientId).isEqualTo("kafka-consumer-test");

source = createSource(config, 1);
clientId = source.getConsumer().get(ConsumerConfig.CLIENT_ID_CONFIG);
assertThat(clientId).isEqualTo("kafka-consumer-test-1");

config.put("client.id", "custom-client-id");
source = createSource(config, 2);
clientId = source.getConsumer().get(ConsumerConfig.CLIENT_ID_CONFIG);
assertThat(clientId).isEqualTo("custom-client-id-2");

config.put("client-id-prefix", "my-client-");
config.remove("client.id");
source = createSource(config, -1);
clientId = source.getConsumer().get(ConsumerConfig.CLIENT_ID_CONFIG);
assertThat(clientId).isEqualTo("my-client-test");

config.put("client.id", "custom-client-id");
config.put("client-id-prefix", "my-client-");
source = createSource(config, 2);
clientId = source.getConsumer().get(ConsumerConfig.CLIENT_ID_CONFIG);
assertThat(clientId).isEqualTo("my-client-custom-client-id-2");
}

@Test
Expand Down
Loading

0 comments on commit 661d39f

Please sign in to comment.