diff --git a/pom.xml b/pom.xml
index 6a5b014707..a8f8edce35 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,7 +106,7 @@
1.1.2
1.7.33
5.6
- 0.100.0
+ 0.101.0
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.java
index a26805d5b9..579cf2fb4b 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.java
@@ -207,7 +207,6 @@ public synchronized void close() {
Uni.createFrom().voidItem().invoke(() -> {
LOGGER.infof("Closing consumer %s", clientId());
if (kafkaConsumer != null) {
- polling.compareAndSet(true, false);
kafkaConsumer.close(kafkaApiTimeout);
kafkaConsumer = null;
}
@@ -215,6 +214,7 @@ public synchronized void close() {
consumerExecutor.shutdown();
consumerExecutor = null;
}
+ polling.compareAndSet(true, false);
}).runSubscriptionOn(getOrCreateExecutor()).subscribeAsCompletionStage();
}
}
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/KafkaCompanion.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/KafkaCompanion.java
index 39e04f4fcb..391fd0efb9 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/KafkaCompanion.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/KafkaCompanion.java
@@ -8,9 +8,8 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
@@ -43,6 +42,7 @@ public class KafkaCompanion implements AutoCloseable {
private final String bootstrapServers;
private final Duration kafkaApiTimeout;
private AdminClient adminClient;
+ private final List onClose = new CopyOnWriteArrayList<>();
public KafkaCompanion(String bootstrapServers) {
this(bootstrapServers, Duration.ofSeconds(10));
@@ -73,15 +73,17 @@ public synchronized AdminClient getOrCreateAdminClient() {
if (adminClient == null) {
Map configMap = new HashMap<>(getCommonClientConfig());
configMap.put(BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
+ configMap.put(CLIENT_ID_CONFIG, "companion-admin-for-" + getBootstrapServers());
adminClient = AdminClient.create(configMap);
+ registerOnClose(() -> adminClient.close(kafkaApiTimeout));
}
return adminClient;
}
@Override
public synchronized void close() {
- if (adminClient != null) {
- adminClient.close(kafkaApiTimeout);
+ for (Runnable runnable : new ArrayList<>(onClose)) {
+ runnable.run();
}
}
@@ -195,13 +197,19 @@ public Map getConsumerProperties() {
public ConsumerBuilder consumeWithDeserializers(
Class extends Deserializer>> keyDeserType,
Class extends Deserializer>> valueDeserType) {
- return new ConsumerBuilder<>(getConsumerProperties(), kafkaApiTimeout, keyDeserType, valueDeserType);
+ ConsumerBuilder builder = new ConsumerBuilder<>(getConsumerProperties(), kafkaApiTimeout, keyDeserType,
+ valueDeserType);
+ registerOnClose(builder::close);
+ return builder;
}
public ConsumerBuilder consumeWithDeserializers(
Deserializer keyDeserializer,
Deserializer valueDeserializer) {
- return new ConsumerBuilder<>(getConsumerProperties(), kafkaApiTimeout, keyDeserializer, valueDeserializer);
+ ConsumerBuilder builder = new ConsumerBuilder<>(getConsumerProperties(), kafkaApiTimeout, keyDeserializer,
+ valueDeserializer);
+ registerOnClose(builder::close);
+ return builder;
}
public ConsumerBuilder consume(Serde keySerde, Serde valueSerde) {
@@ -243,17 +251,25 @@ public Map getProducerProperties() {
public ProducerBuilder produceWithSerializers(
Class extends Serializer>> keySerializerType,
Class extends Serializer>> valueSerializerType) {
- return new ProducerBuilder<>(getProducerProperties(), kafkaApiTimeout, keySerializerType, valueSerializerType);
+ ProducerBuilder builder = new ProducerBuilder<>(getProducerProperties(), kafkaApiTimeout, keySerializerType,
+ valueSerializerType);
+ registerOnClose(builder::close);
+ return builder;
}
public ProducerBuilder produceWithSerializers(
Serializer keySerializer,
Serializer valueSerializer) {
- return new ProducerBuilder<>(getProducerProperties(), kafkaApiTimeout, keySerializer, valueSerializer);
+ ProducerBuilder builder = new ProducerBuilder<>(getProducerProperties(), kafkaApiTimeout, keySerializer,
+ valueSerializer);
+ registerOnClose(builder::close);
+ return builder;
}
public ProducerBuilder produce(Serde keySerde, Serde valueSerde) {
- return new ProducerBuilder<>(getProducerProperties(), kafkaApiTimeout, keySerde, valueSerde);
+ ProducerBuilder builder = new ProducerBuilder<>(getProducerProperties(), kafkaApiTimeout, keySerde, valueSerde);
+ registerOnClose(builder::close);
+ return builder;
}
public ProducerBuilder produce(Class keyType, Class valueType) {
@@ -276,4 +292,13 @@ public ProducerBuilder produceDoubles() {
return produce(Serdes.String(), Serdes.Double());
}
+ private void registerOnClose(Runnable action) {
+ onClose.add(() -> {
+ try {
+ action.run();
+ } catch (Exception ignored) {
+ // Ignored
+ }
+ });
+ }
}
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/KafkaTask.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/KafkaTask.java
index 1efa033ad9..d4f1d65bcb 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/KafkaTask.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/KafkaTask.java
@@ -20,7 +20,7 @@
* @param the type of items
* @param the reference to self type
*/
-public abstract class KafkaTask> implements Iterable {
+public abstract class KafkaTask> implements Iterable, AutoCloseable {
/**
* The {@link Multi} to subscribe
@@ -202,6 +202,11 @@ public SELF stop() {
return self();
}
+ @Override
+ public void close() {
+ stop();
+ }
+
public long firstOffset() {
T firstRecord = subscriber.getFirstRecord();
if (firstRecord == null) {
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ProducerBuilder.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ProducerBuilder.java
index 304d9719f4..0367f4fc2d 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ProducerBuilder.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ProducerBuilder.java
@@ -171,6 +171,7 @@ public synchronized void close() {
if (kafkaProducer != null) {
LOGGER.infof("Closing producer %s", clientId());
// Kafka producer is thread-safe, we can call close on the caller thread
+ kafkaProducer.flush();
kafkaProducer.close(kafkaApiTimeout);
kafkaProducer = null;
executorService.shutdown();
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/RecordsSubscriber.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/RecordsSubscriber.java
index 173d8b47d8..16b4c8fdba 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/RecordsSubscriber.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/RecordsSubscriber.java
@@ -469,11 +469,11 @@ private SELF request(long req) {
@Override
public void onSubscribe(Subscription s) {
- subscription.set(s);
- if (requested.get() > 0) {
- s.request(requested.get());
+ if (subscription.compareAndSet(null, s)) {
+ if (requested.get() > 0) {
+ s.request(requested.get());
+ }
}
-
}
@Override
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/TopicsCompanion.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/TopicsCompanion.java
index cc3cf1ed58..79abf1fbb1 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/TopicsCompanion.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/TopicsCompanion.java
@@ -98,6 +98,7 @@ public Uni waitForTopic(String topic) {
}
return !checkIfTheTopicIsCreated(topic, topics);
})
+ .select().where(Objects::nonNull)
.toUni()
.map(topics -> topics.get(topic));
}
@@ -119,14 +120,6 @@ boolean checkIfTheTopicIsCreated(String topic, Map des
}
return true;
}
- //
- // boolean checkIfTheTopicIsCreated(String topic, Map description) {
- // return Optional.ofNullable(description)
- // .map(topics -> topics.get(topic))
- // .map(td -> td.partitions().stream()
- // .allMatch(partition -> partition.leader() != null && partition.leader().id() >= 0))
- // .orElse(false);
- // }
/**
* @return the set of topic names
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaBrokerExtension.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaBrokerExtension.java
index 69aeb4b43b..743d1cc3de 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaBrokerExtension.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaBrokerExtension.java
@@ -7,22 +7,26 @@
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import java.lang.reflect.Method;
import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.common.Node;
import org.jboss.logging.Logger;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.*;
import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
+import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException;
import io.strimzi.test.container.StrimziKafkaContainer;
/**
* Junit extension for creating Strimzi Kafka broker
*/
-public class KafkaBrokerExtension implements BeforeAllCallback, ParameterResolver, CloseableResource {
+public class KafkaBrokerExtension implements BeforeAllCallback, BeforeEachCallback, ParameterResolver, CloseableResource {
public static final Logger LOGGER = Logger.getLogger(KafkaBrokerExtension.class.getName());
public static final String KAFKA_VERSION = "3.1.0";
@@ -53,6 +57,9 @@ public static StrimziKafkaContainer createKafkaContainer() {
public static T configureKafkaContainer(T container) {
String kafkaVersion = System.getProperty("kafka-container-version", KAFKA_VERSION);
container.withKafkaVersion(kafkaVersion);
+ Map config = new HashMap<>();
+ config.put("log.cleaner.enable", "false");
+ container.withKafkaConfigurationMap(config);
return container;
}
@@ -131,6 +138,38 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte
return null;
}
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ LOGGER.infof("Running test %s (%s#%s)", context.getDisplayName(),
+ context.getTestClass().map(Class::getName).orElse(""),
+ context.getTestMethod().map(Method::getName).orElse(""));
+ if (kafka != null) {
+ for (int i = 0; i < 3; i++) {
+ try {
+ isBrokerHealthy();
+ return;
+ } catch (ConditionTimeoutException e) {
+ LOGGER.warn("The Kafka broker is not healthy, restarting it");
+ restart(kafka, 0);
+ }
+ }
+ throw new IllegalStateException("The Kafka broker is not unhealthy, despite 3 restarts");
+ }
+ }
+
+ private void isBrokerHealthy() {
+ await().until(() -> kafka.isRunning());
+ await().catchUncaughtExceptions().until(() -> {
+ Map config = new HashMap<>();
+ config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+ config.put(CommonClientConfigs.CLIENT_ID_CONFIG, "broker-healthy-admin");
+ try (AdminClient admin = AdminClient.create(config)) {
+ Collection nodes = admin.describeCluster().nodes().get();
+ return nodes.size() == 1 && nodes.iterator().next().id() >= 0;
+ }
+ });
+ }
+
@Target({ ElementType.FIELD, ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaBootstrapServers {
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaToxiproxyExtension.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaToxiproxyExtension.java
index fa8d4946cc..ce669aee49 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaToxiproxyExtension.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaToxiproxyExtension.java
@@ -5,19 +5,15 @@
import java.util.logging.Logger;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.*;
import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
import org.testcontainers.containers.Network;
/**
* Junit extension for creating Strimzi Kafka broker behind a Toxiproxy
*/
public class KafkaToxiproxyExtension extends KafkaBrokerExtension
- implements BeforeAllCallback, ParameterResolver, CloseableResource {
+ implements BeforeAllCallback, BeforeEachCallback, ParameterResolver, CloseableResource {
public static final Logger LOGGER = Logger.getLogger(KafkaToxiproxyExtension.class.getName());
@Override
@@ -34,6 +30,12 @@ public void beforeAll(ExtensionContext context) {
}
}
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ // Do nothing for the ToxyProxy
+ // In this case we will not restart unhealthy brokers.
+ }
+
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerTest.java b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerTest.java
index a81fa78473..1d3f0c259a 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerTest.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerTest.java
@@ -53,16 +53,12 @@ void testConsumeParallel() {
.awaitCompletion(Duration.ofSeconds(10));
AtomicInteger records = new AtomicInteger();
- ConsumerTask task = companion.consumeIntegers().fromTopics(topic, withCallback(cr -> {
- records.incrementAndGet();
- }, 10));
-
- await().until(() -> task.count() == 1000);
-
- task.stop();
+ try (ConsumerTask task = companion.consumeIntegers()
+ .fromTopics(topic, withCallback(cr -> records.incrementAndGet(), 10))) {
+ await().until(() -> task.count() == 1000);
+ }
assertThat(records.get()).isEqualTo(1000);
- assertThat(task.count()).isEqualTo(1000L);
}
@Test
@@ -153,14 +149,15 @@ void testConsumerWithCommitSync() {
@Test
void testWaitForAssignments() {
- companion.produceStrings().usingGenerator(i -> new ProducerRecord<>(topic, "t" + i));
-
- ConsumerBuilder consumer = companion.consumeStrings();
- ConsumerTask records = consumer.fromTopics(topic);
-
- assertThat(consumer.waitForAssignment().await().indefinitely()).hasSize(1);
-
- assertThat(records.awaitNextRecords(100, Duration.ofSeconds(3)).count()).isGreaterThanOrEqualTo(100);
+ try (ProducerTask producer = companion.produceStrings()
+ .usingGenerator(i -> new ProducerRecord<>(topic, "t" + i))) {
+
+ ConsumerBuilder consumer = companion.consumeStrings();
+ try (ConsumerTask records = consumer.fromTopics(topic)) {
+ assertThat(consumer.waitForAssignment().await().indefinitely()).hasSize(1);
+ assertThat(records.awaitNextRecords(100, Duration.ofMinutes(1)).count()).isGreaterThanOrEqualTo(100);
+ }
+ }
}
@Test
@@ -174,18 +171,15 @@ void testAssignment() {
record(topic3, 3)).awaitCompletion();
ConsumerBuilder consumer = companion.consumeStrings();
-
- ConsumerTask records = consumer.fromTopics(topic1, topic2, topic3);
-
- consumer.waitForAssignment().await().indefinitely();
- assertThat(consumer.assignment()).hasSize(3);
- assertThat(consumer.currentAssignment()).hasSize(3);
-
- records.awaitRecords(3);
- assertThat(consumer.assignment()).hasSize(3);
- assertThat(consumer.currentAssignment()).hasSize(3);
-
- records.stop();
+ try (ConsumerTask records = consumer.fromTopics(topic1, topic2, topic3)) {
+ consumer.waitForAssignment().await().indefinitely();
+ assertThat(consumer.assignment()).hasSize(3);
+ assertThat(consumer.currentAssignment()).hasSize(3);
+
+ records.awaitRecords(3);
+ assertThat(consumer.assignment()).hasSize(3);
+ assertThat(consumer.currentAssignment()).hasSize(3);
+ }
assertThat(consumer.assignment()).hasSize(0);
assertThat(consumer.currentAssignment()).hasSize(0);
}
@@ -194,15 +188,13 @@ void testAssignment() {
void testPosition() {
ConsumerBuilder consumer = companion.consumeStrings();
- ConsumerTask records = consumer.fromTopics(topic);
-
- companion.produceStrings().usingGenerator(i -> record(topic, "v" + i), Duration.ofSeconds(4));
-
- records.awaitRecords(100);
+ try (ConsumerTask records = consumer.fromTopics(topic)) {
+ companion.produceStrings().usingGenerator(i -> record(topic, "v" + i), 200);
- assertThat(consumer.position(tp(topic, 0))).isGreaterThanOrEqualTo(100L);
+ records.awaitRecords(100, Duration.ofMinutes(1));
- records.stop();
+ assertThat(consumer.position(tp(topic, 0))).isGreaterThanOrEqualTo(100L);
+ }
// position after closing consumer
assertThatThrownBy(() -> consumer.position(tp(topic, 0)))
.isInstanceOf(IllegalStateException.class);
@@ -213,19 +205,16 @@ void testCommitted() {
ConsumerBuilder consumer = companion.consumeStrings()
.withCommitSyncWhen(cr -> true);
- ConsumerTask records = consumer.fromTopics(topic);
-
- companion.produceStrings().usingGenerator(i -> record(topic, "v" + i), 200);
-
- records.awaitRecords(100);
- await().untilAsserted(() -> assertThat(consumer.committed(tp(topic, 0)).offset())
- .isGreaterThanOrEqualTo(100L));
+ try (ConsumerTask records = consumer.fromTopics(topic)) {
+ companion.produceStrings().usingGenerator(i -> record(topic, "v" + i), 200);
- records.awaitRecords(200);
- await().untilAsserted(() -> assertThat(consumer.committed(tp(topic, 0)).offset()).isEqualTo(199L));
+ records.awaitRecords(100);
+ await().untilAsserted(() -> assertThat(consumer.committed(tp(topic, 0)).offset())
+ .isGreaterThanOrEqualTo(100L));
- records.stop();
- await().untilAsserted(() -> assertThat(consumer.currentAssignment()).hasSize(0));
+ records.awaitRecords(200);
+ await().untilAsserted(() -> assertThat(consumer.committed(tp(topic, 0)).offset()).isEqualTo(199L));
+ }
}
@Test
@@ -248,13 +237,10 @@ void testConsumerReuse() {
@Test
void testConsumerReuseFail() {
ConsumerBuilder consumer = companion.consumeStrings();
-
- ConsumerTask records = consumer.fromTopics(topic);
-
- consumer.fromTopics(topic)
- .awaitCompletion((failure, cancelled) -> assertThat(failure).isInstanceOf(IllegalStateException.class));
-
- records.stop();
+ try (ConsumerTask records = consumer.fromTopics(topic)) {
+ consumer.fromTopics(topic)
+ .awaitCompletion((failure, cancelled) -> assertThat(failure).isInstanceOf(IllegalStateException.class));
+ }
}
@Test
@@ -276,24 +262,22 @@ void testPauseResume() throws InterruptedException {
ConsumerBuilder consumer = companion.consumeIntegers();
- ConsumerTask records = consumer.fromTopics(topic);
- records.awaitRecords(100);
-
- consumer.pause();
+ try (ConsumerTask records = consumer.fromTopics(topic)) {
+ records.awaitRecords(100);
- Thread.sleep(1000);
- // Consumption does not advance
- await().untilAsserted(() -> {
- long count = records.count();
- Thread.sleep(100);
- assertThat(records.count()).isEqualTo(count);
- });
+ consumer.pause();
- consumer.resume();
- records.awaitRecords(400);
+ Thread.sleep(1000);
+ // Consumption does not advance
+ await().untilAsserted(() -> {
+ long count = records.count();
+ Thread.sleep(100);
+ assertThat(records.count()).isEqualTo(count);
+ });
- records.stop();
- assertThat(records.count()).isGreaterThanOrEqualTo(400L);
+ consumer.resume();
+ records.awaitRecords(400, Duration.ofMinutes(1));
+ }
}
@Test
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/OffsetsTest.java b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/OffsetsTest.java
index 313d62ee8c..aa0bf5a926 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/OffsetsTest.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/OffsetsTest.java
@@ -33,16 +33,14 @@ void testOffsetReset() {
.extracting(OffsetAndMetadata::offset).isEqualTo(500L));
ConsumerBuilder consumer = companion.consumeStrings();
- ConsumerTask records = consumer
+ try (ConsumerTask records = consumer
.withAutoCommit()
.withGroupId(groupId)
.withOffsetReset(OffsetResetStrategy.LATEST)
- .fromTopics(topic);
-
- consumer.waitForAssignment().await().atMost(Duration.ofSeconds(10));
-
- assertThat(records.count()).isEqualTo(0L);
- records.stop();
+ .fromTopics(topic)) {
+ consumer.waitForAssignment().await().atMost(Duration.ofSeconds(10));
+ assertThat(records.count()).isEqualTo(0L);
+ }
assertThat(companion.consumerGroups().offsets(groupId, tp(topic, 0)).offset()).isEqualTo(500L);
@@ -53,7 +51,5 @@ void testOffsetReset() {
await().untilAsserted(
() -> assertThat(companion.consumerGroups().offsets(groupId, tp(topic, 0)).offset()).isEqualTo(0L));
-
- consumer.fromTopics(topic).awaitRecords(500).stop();
}
}
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/ProducerTest.java b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/ProducerTest.java
index 76bc6393c0..a9f1430037 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/ProducerTest.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/ProducerTest.java
@@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -165,12 +166,11 @@ void testProducerStop() {
ProducerTask produced = companion.produceIntegers()
.usingGenerator(i -> record(topic, "key", i));
- await().until(() -> produced.count() >= 1000);
+ await().atMost(1, TimeUnit.MINUTES).until(() -> produced.count() >= 1000);
produced.stop();
long finalProducedCount = produced.awaitCompletion((throwable, cancelled) -> assertThat(cancelled).isTrue())
.count();
- System.out.println("Completed " + finalProducedCount);
// Consume records until produced record
ConsumerTask records = companion.consumeIntegers()
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/TestTags.java b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/TestTags.java
new file mode 100644
index 0000000000..c432bb2ed4
--- /dev/null
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/TestTags.java
@@ -0,0 +1,5 @@
+package io.smallrye.reactive.messaging.kafka.companion;
+
+public class TestTags {
+ public static final String FLAKY = "flaky";
+}
diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaTest.java b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaTest.java
index 4930b1a5e7..1fed34cd2d 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaTest.java
+++ b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaTest.java
@@ -15,12 +15,15 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.SslConfigs;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
+import io.smallrye.reactive.messaging.kafka.companion.TestTags;
import kafka.server.KafkaConfig;
+@Tag(TestTags.FLAKY)
public class EmbeddedKafkaTest {
@Test
@@ -31,6 +34,7 @@ void test() {
KafkaCompanion companion = new KafkaCompanion(advertisedListeners);
assertThat(companion.topics().list()).isEmpty();
assertFunctionalBroker(companion);
+ companion.close();
}
}
@@ -90,10 +94,6 @@ void assertFunctionalBroker(KafkaCompanion companion) {
// produce messages
companion.produceStrings().usingGenerator(i -> new ProducerRecord<>("messages", i % 3, "k", "" + i), 100);
// consume messages
- companion.consumeStrings().fromTopics("messages", 100)
- .awaitCompletion()
- .byTopicPartition().entrySet().forEach(e -> {
- System.out.println(e.getKey() + " " + e.getValue().size());
- });
+ companion.consumeStrings().fromTopics("messages", 100).awaitCompletion();
}
}
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/BatchConsumerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/BatchConsumerTest.java
index 54dac09ab7..0f935bdce0 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/BatchConsumerTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/BatchConsumerTest.java
@@ -66,7 +66,7 @@ void testIncomingConsumingKafkaBatchRecords() {
@Test
void testIncomingConsumingMessageWithMetadata() {
String newTopic = UUID.randomUUID().toString();
- companion.topics().create(newTopic, 3);
+ companion.topics().createAndWait(newTopic, 3);
KafkaMapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka")
.put("value.deserializer", StringDeserializer.class.getName())
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java
index 056f1a2eeb..66fec33c18 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java
@@ -89,7 +89,7 @@ public void testSourceWithPartitions() {
.with("value.deserializer", IntegerDeserializer.class.getName())
.with("partitions", 4);
- companion.topics().create(topic, 3);
+ companion.topics().createAndWait(topic, 3, Duration.ofMinutes(1));
KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config);
source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic,
UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1);
@@ -168,7 +168,7 @@ public void testBroadcast() {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testBroadcastWithPartitions() {
- companion.topics().create(topic, 2);
+ companion.topics().createAndWait(topic, 2, Duration.ofMinutes(1));
MapBasedConfig config = newCommonConfigForSource()
.with("value.deserializer", IntegerDeserializer.class.getName())
.with("broadcast", true)
@@ -277,7 +277,7 @@ public void testRetry() {
// assertThat(messages1.size()).isGreaterThanOrEqualTo(20);
// }
- private KafkaMapBasedConfig myKafkaSourceConfig(int partitions, String withConsumerRebalanceListener,
+ private KafkaMapBasedConfig myKafkaSourceConfig(String topic, int partitions, String withConsumerRebalanceListener,
String group) {
KafkaMapBasedConfig config = kafkaConfig("mp.messaging.incoming.data");
if (group != null) {
@@ -286,10 +286,9 @@ private KafkaMapBasedConfig myKafkaSourceConfig(int partitions, String withConsu
config.put("value.deserializer", IntegerDeserializer.class.getName());
config.put("enable.auto.commit", "false");
config.put("auto.offset.reset", "earliest");
- config.put("topic", "data");
+ config.put("topic", topic);
if (partitions > 0) {
config.put("partitions", Integer.toString(partitions));
- config.put("topic", "data-" + partitions);
}
if (withConsumerRebalanceListener != null) {
config.put("consumer-rebalance-listener.name", withConsumerRebalanceListener);
@@ -361,8 +360,8 @@ public void testABeanConsumingTheKafkaMessages() {
@Test
public void testABeanConsumingTheKafkaMessagesMultiThread() {
String group = UUID.randomUUID().toString();
- MultiThreadConsumer bean = runApplication(myKafkaSourceConfig(0, null, group)
- .with("topic", topic), MultiThreadConsumer.class);
+ MultiThreadConsumer bean = runApplication(myKafkaSourceConfig(topic, 0, null, group),
+ MultiThreadConsumer.class);
List list = bean.getItems();
assertThat(list).isEmpty();
bean.run();
@@ -373,19 +372,24 @@ public void testABeanConsumingTheKafkaMessagesMultiThread() {
@Test
public void testABeanConsumingTheKafkaMessagesWithPartitions() {
- companion.topics().create("data-2", 2);
+ companion.topics().createAndWait(topic, 2, Duration.ofMinutes(1));
+ // Verify the creation of the topic
+ assertThat(companion.topics().describe(topic).get(topic).partitions()).hasSize(2)
+ .allSatisfy(tpi -> assertThat(tpi.leader()).isNotNull()
+ .satisfies(node -> assertThat(node.id()).isGreaterThanOrEqualTo(0)));
+
ConsumptionBean bean = run(
- myKafkaSourceConfig(2, ConsumptionConsumerRebalanceListener.class.getSimpleName(), null));
+ myKafkaSourceConfig(topic, 2, ConsumptionConsumerRebalanceListener.class.getSimpleName(), null));
List list = bean.getResults();
assertThat(list).isEmpty();
- companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>("data-2", i), 10);
+ companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10);
await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 10);
assertThat(list).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List> messages = bean.getKafkaMessages();
messages.forEach(m -> {
- assertThat(m.getTopic()).isEqualTo("data-2");
+ assertThat(m.getTopic()).isEqualTo(topic);
assertThat(m.getTimestamp()).isAfter(Instant.EPOCH);
assertThat(m.getPartition()).isGreaterThan(-1);
});
@@ -395,14 +399,14 @@ public void testABeanConsumingTheKafkaMessagesWithPartitions() {
for (int i = 0; i < 2; i++) {
TopicPartition partition = consumptionConsumerRebalanceListener.getAssigned().get(i);
assertThat(partition).isNotNull();
- assertThat(partition.topic()).isEqualTo("data-2");
+ assertThat(partition.topic()).isEqualTo(topic);
}
}
@Test
public void testABeanConsumingWithMissingRebalanceListenerConfiguredByName() {
String group = UUID.randomUUID().toString();
- assertThatThrownBy(() -> run(myKafkaSourceConfig(0, "not exists", group)))
+ assertThatThrownBy(() -> run(myKafkaSourceConfig(topic, 0, "not exists", group)))
.isInstanceOf(DeploymentException.class)
.hasCauseInstanceOf(UnsatisfiedResolutionException.class);
}
@@ -464,7 +468,7 @@ public void testABeanConsumingTheKafkaMessagesStartingOnFifthOffsetFromLatestTha
await()
.atMost(2, TimeUnit.MINUTES)
- .until(() -> list.size() >= 5);
+ .until(() -> list.size() >= 10);
// The rebalance listener failed, no retry
assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
@@ -505,11 +509,11 @@ public void testInvalidIncomingType() {
@Test
public void testABeanConsumingTheKafkaMessagesWithRawMessage() {
String group = UUID.randomUUID().toString();
- ConsumptionBeanUsingRawMessage bean = runApplication(myKafkaSourceConfig(0, null, group),
+ ConsumptionBeanUsingRawMessage bean = runApplication(myKafkaSourceConfig(topic, 0, null, group),
ConsumptionBeanUsingRawMessage.class);
List list = bean.getResults();
assertThat(list).isEmpty();
- companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>("data", i), 10);
+ companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10);
await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 10);
assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
@@ -520,7 +524,7 @@ public void testABeanConsumingTheKafkaMessagesWithRawMessage() {
io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata metadata = m
.getMetadata(io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata.class)
.orElseThrow(() -> new AssertionError("Metadata expected"));
- assertThat(metadata.getTopic()).isEqualTo("data");
+ assertThat(metadata.getTopic()).isEqualTo(topic);
assertThat(metadata.getTimestamp()).isAfter(Instant.EPOCH);
assertThat(metadata.getPartition()).isGreaterThan(-1);
assertThat(metadata.getOffset()).isGreaterThan(-1);
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java
index 1cacd3acc1..2fcf083b7c 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java
@@ -94,7 +94,7 @@ public void testSourceWithPartitions() {
.with("value.deserializer", IntegerDeserializer.class.getName())
.with("partitions", 4);
- companion.topics().create(topic, 3);
+ companion.topics().createAndWait(topic, 3);
KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config);
source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic,
UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1);
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java
index 3819a617f0..915026d016 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java
@@ -23,6 +23,7 @@
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
+import io.smallrye.reactive.messaging.kafka.companion.ProducerTask;
/**
* Test the Incoming connector when multiple topics are used either using a pattern or a list of topics.
@@ -103,12 +104,13 @@ public void testWithOnlyTwoTopicsReceiving() {
assertThat(bean.getMessages()).isEmpty();
- companion.produceStrings()
+ ProducerTask pt1 = companion.produceStrings()
.usingGenerator(i -> new ProducerRecord<>(topic1, Integer.toString(i), "hello"), 3);
- companion.produceStrings()
+ ProducerTask pt2 = companion.produceStrings()
.usingGenerator(i -> new ProducerRecord<>(topic3, Integer.toString(i), "bonjour"), 3);
+ await().until(() -> pt1.count() == 3 && pt2.count() == 3);
await().until(() -> bean.getMessages().size() >= 6);
AtomicInteger top1 = new AtomicInteger();
@@ -142,9 +144,9 @@ public void testWithPattern() {
String topic2 = "greetings-" + UUID.randomUUID().toString();
String topic3 = "greetings-" + UUID.randomUUID().toString();
- companion.topics().create(topic1, 1);
- companion.topics().create(topic2, 1);
- companion.topics().create(topic3, 1);
+ companion.topics().createAndWait(topic1, 1);
+ companion.topics().createAndWait(topic2, 1);
+ companion.topics().createAndWait(topic3, 1);
KafkaConsumer bean = runApplication(kafkaConfig("mp.messaging.incoming.kafka")
.with("value.deserializer", StringDeserializer.class.getName())
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordTest.java
index 39d9696d38..e74eff5d04 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordTest.java
@@ -36,7 +36,7 @@ public class ProducerRecordTest extends KafkaCompanionTestBase {
@Test
public void test() {
for (int i = 0; i < 10; i++) {
- companion.topics().create(TOPIC_NAME_BASE + i, 1);
+ companion.topics().createAndWait(TOPIC_NAME_BASE + i, 1);
}
addBeans(ConsumerRecordConverter.class);
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordWithLegacyMetadataTest.java
index 1fbdd80288..68414362c6 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordWithLegacyMetadataTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordWithLegacyMetadataTest.java
@@ -39,7 +39,7 @@ public class ProducerRecordWithLegacyMetadataTest extends KafkaCompanionTestBase
@Test
public void test() {
for (int i = 0; i < 10; i++) {
- companion.topics().create(TOPIC_NAME_BASE + i, 1);
+ companion.topics().createAndWait(TOPIC_NAME_BASE + i, 1);
}
addBeans(ConsumerRecordConverter.class);
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java
index 2547afa93d..2fafb66011 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java
@@ -33,7 +33,7 @@ public class BrokerRestartTest extends ClientTestBase {
@BeforeEach
public void init() {
String newTopic = "test-" + UUID.randomUUID();
- companion.topics().create(newTopic, partitions);
+ companion.topics().createAndWait(newTopic, partitions);
this.topic = newTopic;
resetMessages();
}
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java
index c87d3f5689..c21eb06574 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java
@@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
@@ -238,6 +239,7 @@ void sendMessages(Stream extends ProducerRecord> records) thr
for (Future> future : futures) {
future.get(5, TimeUnit.SECONDS);
}
+ producer.flush();
}
}
@@ -250,6 +252,7 @@ void sendMessages(Stream extends ProducerRecord> records, Str
for (Future> future : futures) {
future.get(5, TimeUnit.SECONDS);
}
+ producer.flush();
}
}
@@ -330,10 +333,10 @@ protected void resetMessages() {
expectedMessages.clear();
receivedMessages.clear();
for (int i = 0; i < partitions; i++) {
- expectedMessages.add(new ArrayList<>());
+ expectedMessages.add(new CopyOnWriteArrayList<>());
}
for (int i = 0; i < partitions; i++) {
- receivedMessages.add(new ArrayList<>());
+ receivedMessages.add(new CopyOnWriteArrayList<>());
}
}
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/KafkaClientReactiveStreamsPublisherTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/KafkaClientReactiveStreamsPublisherTest.java
index 46008df331..0308e14d26 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/KafkaClientReactiveStreamsPublisherTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/KafkaClientReactiveStreamsPublisherTest.java
@@ -51,7 +51,7 @@ public KafkaClientReactiveStreamsPublisherTest() {
public static void init(@KafkaBootstrapServers String bootstrapServers) {
companion = new KafkaCompanion(bootstrapServers);
String newTopic = "tck-" + UUID.randomUUID();
- companion.topics().create(newTopic, partitions);
+ companion.topics().createAndWait(newTopic, partitions);
topic = newTopic;
vertx = Vertx.vertx();
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java
index 5d08e9aa50..1d6fcded5a 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java
@@ -26,7 +26,7 @@ public void tearDown() {
@BeforeEach
public void init() {
String newTopic = "test-" + UUID.randomUUID();
- companion.topics().create(newTopic, partitions);
+ companion.topics().createAndWait(newTopic, partitions);
this.topic = newTopic;
resetMessages();
}
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java
index f4580b4a54..633b44ba73 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java
@@ -34,7 +34,7 @@ public class ReactiveKafkaProducerTest extends ClientTestBase {
public void init() {
sinks = new ConcurrentLinkedQueue<>();
String newTopic = "test-" + UUID.randomUUID();
- companion.topics().create(newTopic, partitions);
+ companion.topics().createAndWait(newTopic, partitions);
this.topic = newTopic;
resetMessages();
}
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java
index 936912df8f..0d8ca47d15 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java
@@ -337,7 +337,7 @@ void testThrottledStrategyWithTooManyUnackedMessages() {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
source.isAlive(builder);
String message = builder.build().getChannels().get(0).getMessage();
- assertThat(message).contains("my-topic-0", "my-topic-1");
+ assertThat(message).containsAnyOf("my-topic-0", "my-topic-1");
}
@Test
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/KafkaCommitHandlerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/KafkaCommitHandlerTest.java
index 011a0735a2..09399235b6 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/KafkaCommitHandlerTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/KafkaCommitHandlerTest.java
@@ -226,7 +226,7 @@ public void testSourceWithThrottledLatestProcessedCommitEnabledWithoutAck() {
@Test
public void testSourceWithThrottledAndRebalance() {
- companion.topics().create(topic, 2);
+ companion.topics().createAndWait(topic, 2);
MapBasedConfig config1 = newCommonConfigForSource()
.with("client.id", UUID.randomUUID().toString())
.with("group.id", "test-source-with-throttled-latest-processed-commit")
@@ -310,7 +310,7 @@ public void testSourceWithThrottledAndRebalance() {
@Test
void testSourceWithThrottledAndRebalanceWithPartitionsConfig() {
- companion.topics().create(topic, 4);
+ companion.topics().createAndWait(topic, 4);
companion.produceIntegers()
.usingGenerator(i -> new ProducerRecord<>(topic, Integer.toString(i % 2), i), 10000);
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/PartitionTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/PartitionTest.java
index a8f17abd8b..a1988069ed 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/PartitionTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/PartitionTest.java
@@ -27,7 +27,7 @@ public class PartitionTest extends KafkaCompanionTestBase {
@Test
public void testWithPartitions() {
- companion.topics().create(topic, 3);
+ companion.topics().createAndWait(topic, 3);
String groupId = UUID.randomUUID().toString();
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka")
@@ -65,7 +65,7 @@ public void testWithPartitions() {
@Test
public void testWithMoreConsumersThanPartitions() throws InterruptedException {
- companion.topics().create(topic, 3);
+ companion.topics().createAndWait(topic, 3);
String groupId = UUID.randomUUID().toString();
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka")
.with("group.id", groupId)
@@ -102,7 +102,7 @@ public void testWithMoreConsumersThanPartitions() throws InterruptedException {
@Test
public void testWithMorePartitionsThanConsumers() throws InterruptedException {
- companion.topics().create(topic, 3);
+ companion.topics().createAndWait(topic, 3);
String groupId = UUID.randomUUID().toString();
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka")
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/health/HealthCheckTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/health/HealthCheckTest.java
index aeb0c7190a..68cc2864a7 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/health/HealthCheckTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/health/HealthCheckTest.java
@@ -58,7 +58,7 @@ public void testHealthOfApplicationWithoutOutgoingTopic() {
@Test
void testHealthOfApplicationWithOutgoingTopicUsingTopicVerification() {
String outputTopic = UUID.randomUUID().toString();
- companion.topics().create(outputTopic, 1);
+ companion.topics().createAndWait(outputTopic, 1, Duration.ofMinutes(1));
MapBasedConfig config = new MapBasedConfig(getKafkaSinkConfigForProducingBean()
.put("health-topic-verification-enabled", true)
.put("topic", outputTopic));
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/perf/PerformanceProducerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/perf/PerformanceProducerTest.java
index ae55492843..d2566acf79 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/perf/PerformanceProducerTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/perf/PerformanceProducerTest.java
@@ -29,7 +29,7 @@ public class PerformanceProducerTest extends KafkaCompanionTestBase {
@Test
public void testDefault() {
String topic = UUID.randomUUID().toString();
- companion.topics().create(topic, 10);
+ companion.topics().createAndWait(topic, 10);
ConsumerTask records = companion.consumeIntegers().fromTopics(topic, COUNT, Duration.ofMinutes(1));
KafkaMapBasedConfig config = kafkaConfig("mp.messaging.outgoing.kafka")
@@ -58,7 +58,7 @@ public void testDefault() {
@Test
public void testWithoutBackPressure() {
String topic = UUID.randomUUID().toString();
- companion.topics().create(topic, 10);
+ companion.topics().createAndWait(topic, 10);
ConsumerTask records = companion.consumeIntegers().fromTopics(topic, COUNT,
Duration.ofMinutes(TIMEOUT_IN_MINUTES));
@@ -89,7 +89,7 @@ public void testWithoutBackPressure() {
@Test
public void testWithoutBackPressureAndNoWait() {
String topic = UUID.randomUUID().toString();
- companion.topics().create(topic, 10);
+ companion.topics().createAndWait(topic, 10);
ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, COUNT,
Duration.ofMinutes(TIMEOUT_IN_MINUTES));
@@ -121,7 +121,7 @@ public void testWithoutBackPressureAndNoWait() {
@Test
public void testWithoutBackPressureAndIdempotence() throws InterruptedException {
String topic = UUID.randomUUID().toString();
- companion.topics().create(topic, 10);
+ companion.topics().createAndWait(topic, 10);
ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, COUNT,
Duration.ofMinutes(TIMEOUT_IN_MINUTES));
@@ -154,7 +154,7 @@ public void testWithoutBackPressureAndIdempotence() throws InterruptedException
@Test
public void testWithoutBackPressureAndIncreaseKafkaRequests() {
String topic = UUID.randomUUID().toString();
- companion.topics().create(topic, 10);
+ companion.topics().createAndWait(topic, 10);
ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, COUNT,
Duration.ofMinutes(TIMEOUT_IN_MINUTES));