From dc745a62695d167f5c2f2db72a5d7588d212dae7 Mon Sep 17 00:00:00 2001 From: see-quick Date: Mon, 14 Oct 2024 09:03:34 +0200 Subject: [PATCH 1/3] Add ability to enable shared network Signed-off-by: see-quick --- .../test/container/StrimziKafkaCluster.java | 25 +++- .../io/strimzi/test/container/AbstractIT.java | 6 + .../test/container/StrimziKafkaClusterIT.java | 117 ++++++++++++------ .../container/StrimziKafkaClusterTest.java | 14 +-- .../container/StrimziKafkaContainerIT.java | 28 ----- .../StrimziKafkaKraftContainerIT.java | 6 - .../StrimziZookeeperContainerIT.java | 2 - 7 files changed, 114 insertions(+), 84 deletions(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java index 201b721..c401b60 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java @@ -35,10 +35,11 @@ public class StrimziKafkaCluster implements KafkaContainer { // instance attributes private final int brokersNum; - private final Network network; private final StrimziZookeeperContainer zookeeper; private final Collection brokers; + private Network network; + /** * Constructor for @StrimziKafkaCluster class, which allows you to specify number of brokers @see{brokersNum}, * replication factor of internal topics @see{internalTopicReplicationFactor}, a map of additional Kafka @@ -51,11 +52,13 @@ public class StrimziKafkaCluster implements KafkaContainer { * @param internalTopicReplicationFactor internal topics * @param additionalKafkaConfiguration additional Kafka configuration * @param proxyContainer Proxy container + * @param enableSharedNetwork enable Kafka cluster to use a shared Docker network. */ public StrimziKafkaCluster(final int brokersNum, final int internalTopicReplicationFactor, final Map additionalKafkaConfiguration, - final ToxiproxyContainer proxyContainer) { + final ToxiproxyContainer proxyContainer, + final boolean enableSharedNetwork) { if (brokersNum <= 0) { throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0"); } @@ -64,7 +67,7 @@ public StrimziKafkaCluster(final int brokersNum, } this.brokersNum = brokersNum; - this.network = Network.newNetwork(); + this.network = enableSharedNetwork ? Network.SHARED : Network.newNetwork(); this.zookeeper = new StrimziZookeeperContainer() .withNetwork(this.network); @@ -117,7 +120,7 @@ public StrimziKafkaCluster(final int brokersNum, public StrimziKafkaCluster(final int brokersNum, final int internalTopicReplicationFactor, final Map additionalKafkaConfiguration) { - this(brokersNum, internalTopicReplicationFactor, additionalKafkaConfiguration, null); + this(brokersNum, internalTopicReplicationFactor, additionalKafkaConfiguration, null, false); } /** @@ -126,7 +129,7 @@ public StrimziKafkaCluster(final int brokersNum, * @param brokersNum number of brokers */ public StrimziKafkaCluster(final int brokersNum) { - this(brokersNum, brokersNum, null, null); + this(brokersNum, brokersNum, null, null, false); } /** @@ -136,7 +139,17 @@ public StrimziKafkaCluster(final int brokersNum) { * @param proxyContainer Proxy container */ public StrimziKafkaCluster(final int brokersNum, final ToxiproxyContainer proxyContainer) { - this(brokersNum, brokersNum, null, proxyContainer); + this(brokersNum, brokersNum, null, proxyContainer, false); + } + + /** + * Constructor of StrimziKafkaCluster with proxy container + * + * @param brokersNum number of brokers to be deployed + * @param enableSharedNetwork enable Kafka cluster to use a shared Docker network. + */ + public StrimziKafkaCluster(final int brokersNum, final boolean enableSharedNetwork) { + this(brokersNum, brokersNum, null, null, enableSharedNetwork); } /** diff --git a/src/test/java/io/strimzi/test/container/AbstractIT.java b/src/test/java/io/strimzi/test/container/AbstractIT.java index 11ec0f9..cb205cc 100644 --- a/src/test/java/io/strimzi/test/container/AbstractIT.java +++ b/src/test/java/io/strimzi/test/container/AbstractIT.java @@ -12,6 +12,7 @@ import java.util.stream.Stream; import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.provider.Arguments; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,4 +51,9 @@ protected void supportsKraftMode(final String imageName) { protected boolean isLessThanKafka350(final String kafkaVersion) { return KafkaVersionService.KafkaVersion.compareVersions(kafkaVersion, "3.5.0") == -1; } + + @BeforeEach + void setUpEach() { + assumeDocker(); + } } diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java index 686a76d..0bf95e5 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java @@ -51,12 +51,91 @@ public class StrimziKafkaClusterIT extends AbstractIT { private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaContainerIT.class); + private static final int NUMBER_OF_REPLICAS = 3; private StrimziKafkaCluster systemUnderTest; - private int numberOfReplicas; @Test void testKafkaClusterStartup() throws IOException, InterruptedException { + try { + setUpKafkaCluster(); + + verifyReadinessOfKafkaCluster(); + } finally { + systemUnderTest.stop(); + } + } + + @Test + void testKafkaClusterStartupWithSharedNetwork() throws IOException, InterruptedException { + try { + systemUnderTest = new StrimziKafkaCluster(3, true); + systemUnderTest.start(); + + verifyReadinessOfKafkaCluster(); + } finally { + systemUnderTest.stop(); + } + } + + @Test + void testKafkaClusterFunctionality() throws ExecutionException, InterruptedException, TimeoutException { + setUpKafkaCluster(); + + try { + verifyFunctionalityOfKafkaCluster(); + } finally { + systemUnderTest.stop(); + } + } + + @Test + void testKafkaClusterWithSharedNetworkFunctionality() throws ExecutionException, InterruptedException, TimeoutException { + try { + systemUnderTest = new StrimziKafkaCluster(NUMBER_OF_REPLICAS, true); + systemUnderTest.start(); + + verifyFunctionalityOfKafkaCluster(); + } finally { + systemUnderTest.stop(); + } + } + + @Test + void testStartClusterWithProxyContainer() { + setUpKafkaCluster(); + + ToxiproxyContainer proxyContainer = new ToxiproxyContainer( + DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.6.0") + .asCompatibleSubstituteFor("shopify/toxiproxy")); + + StrimziKafkaCluster kafkaCluster = null; + + try { + kafkaCluster = new StrimziKafkaCluster(3, proxyContainer); + + kafkaCluster.start(); + List bootstrapUrls = new ArrayList<>(); + for (KafkaContainer kafkaContainer : kafkaCluster.getBrokers()) { + Proxy proxy = ((StrimziKafkaContainer) kafkaContainer).getProxy(); + assertThat(proxy, notNullValue()); + bootstrapUrls.add(kafkaContainer.getBootstrapServers()); + } + + assertThat(kafkaCluster.getBootstrapServers(), + is(bootstrapUrls.stream().collect(Collectors.joining(",")))); + } finally { + kafkaCluster.stop(); + systemUnderTest.stop(); + } + } + + private void setUpKafkaCluster() { + systemUnderTest = new StrimziKafkaCluster(NUMBER_OF_REPLICAS); + systemUnderTest.start(); + } + + private void verifyReadinessOfKafkaCluster() throws IOException, InterruptedException { // exercise (fetch the data) final Container.ExecResult result = this.systemUnderTest.getZookeeper().execInContainer( "sh", "-c", @@ -72,8 +151,7 @@ void testKafkaClusterStartup() throws IOException, InterruptedException { LOGGER.info("Brokers are {}", systemUnderTest.getBootstrapServers()); } - @Test - void testKafkaClusterFunctionality() throws InterruptedException, ExecutionException, TimeoutException { + private void verifyFunctionalityOfKafkaCluster() throws ExecutionException, InterruptedException, TimeoutException { // using try-with-resources for AdminClient, KafkaProducer and KafkaConsumer (implicit closing connection) try (final AdminClient adminClient = AdminClient.create(ImmutableMap.of( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers())); @@ -99,7 +177,7 @@ void testKafkaClusterFunctionality() throws InterruptedException, ExecutionExcep final String recordKey = "strimzi"; final String recordValue = "the-best-project-in-the-world"; - final Collection topics = Collections.singletonList(new NewTopic(topicName, numberOfReplicas, (short) numberOfReplicas)); + final Collection topics = Collections.singletonList(new NewTopic(topicName, NUMBER_OF_REPLICAS, (short) NUMBER_OF_REPLICAS)); adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); consumer.subscribe(Collections.singletonList(topicName)); @@ -129,37 +207,6 @@ void testKafkaClusterFunctionality() throws InterruptedException, ExecutionExcep } } - @Test - void testStartClusterWithProxyContainer() { - ToxiproxyContainer proxyContainer = new ToxiproxyContainer( - DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.6.0") - .asCompatibleSubstituteFor("shopify/toxiproxy")); - - StrimziKafkaCluster kafkaCluster = new StrimziKafkaCluster(3, proxyContainer); - kafkaCluster.start(); - - List bootstrapUrls = new ArrayList<>(); - for (KafkaContainer kafkaContainer : kafkaCluster.getBrokers()) { - Proxy proxy = ((StrimziKafkaContainer) kafkaContainer).getProxy(); - assertThat(proxy, notNullValue()); - bootstrapUrls.add(kafkaContainer.getBootstrapServers()); - } - - assertThat(kafkaCluster.getBootstrapServers(), - is(bootstrapUrls.stream().collect(Collectors.joining(",")))); - - kafkaCluster.stop(); - } - - @BeforeEach - void setUp() { - final int numberOfBrokers = 3; - numberOfReplicas = numberOfBrokers; - - systemUnderTest = new StrimziKafkaCluster(numberOfBrokers); - systemUnderTest.start(); - } - @AfterEach void tearDown() { systemUnderTest.stop(); diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaClusterTest.java b/src/test/java/io/strimzi/test/container/StrimziKafkaClusterTest.java index 5ac8564..50201e9 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaClusterTest.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaClusterTest.java @@ -14,26 +14,26 @@ public class StrimziKafkaClusterTest { @Test void testKafkaClusterNegativeOrZeroNumberOfNodes() { assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster( - 0, 1, null, null)); + 0, 1, null, null, false)); assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster( - -1, 1, null, null)); + -1, 1, null, null, false)); } @Test void testKafkaClusterPossibleNumberOfNodes() { assertDoesNotThrow(() -> new StrimziKafkaCluster( - 1, 1, null, null)); + 1, 1, null, null, false)); assertDoesNotThrow(() -> new StrimziKafkaCluster( - 3, 3, null, null)); + 3, 3, null, null, false)); assertDoesNotThrow(() -> new StrimziKafkaCluster( - 10, 3, null, null)); + 10, 3, null, null, false)); } @Test void testNegativeOrMoreReplicasThanAvailableOfKafkaBrokersInternalReplicationError() { assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster( - 0, 0, null, null)); + 0, 0, null, null, false)); assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster( - 3, 5, null, null)); + 3, 5, null, null, false)); } } diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java index 5f15753..061167c 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java @@ -63,8 +63,6 @@ public class StrimziKafkaContainerIT extends AbstractIT { @ParameterizedTest(name = "testStartContainerWithEmptyConfiguration-{0}") @MethodSource("retrieveKafkaVersionsFile") void testStartContainerWithEmptyConfiguration(final String imageName) { - assumeDocker(); - try (StrimziKafkaContainer systemUnderTest = new StrimziKafkaContainer(imageName) .withBrokerId(1) .waitForRunning()) { @@ -79,8 +77,6 @@ void testStartContainerWithEmptyConfiguration(final String imageName) { @ParameterizedTest(name = "testStartContainerWithSomeConfiguration-{0}") @MethodSource("retrieveKafkaVersionsFile") void testStartContainerWithSomeConfiguration(final String imageName) { - assumeDocker(); - Map kafkaConfiguration = new HashMap<>(); kafkaConfiguration.put("log.cleaner.enable", "false"); @@ -109,8 +105,6 @@ void testStartContainerWithSomeConfiguration(final String imageName) { @ParameterizedTest(name = "testStartContainerWithFixedExposedPort-{0}") @MethodSource("retrieveKafkaVersionsFile") void testStartContainerWithFixedExposedPort(final String imageName) { - assumeDocker(); - systemUnderTest = new StrimziKafkaContainer(imageName) .withPort(9092) .waitForRunning(); @@ -125,8 +119,6 @@ void testStartContainerWithFixedExposedPort(final String imageName) { @ParameterizedTest(name = "testStartContainerWithSSLBootstrapServers-{0}") @MethodSource("retrieveKafkaVersionsFile") void testStartContainerWithSSLBootstrapServers(final String imageName) { - assumeDocker(); - systemUnderTest = new StrimziKafkaContainer(imageName) .waitForRunning() .withBootstrapServers(c -> String.format("SSL://%s:%s", c.getHost(), c.getMappedPort(9092))); @@ -141,8 +133,6 @@ void testStartContainerWithSSLBootstrapServers(final String imageName) { @ParameterizedTest(name = "testStartContainerWithServerProperties-{0}") @MethodSource("retrieveKafkaVersionsFile") void testStartContainerWithServerProperties(final String imageName) { - assumeDocker(); - systemUnderTest = new StrimziKafkaContainer(imageName) .waitForRunning() .withServerProperties(MountableFile.forClasspathResource("server.properties")); @@ -161,8 +151,6 @@ void testStartContainerWithServerProperties(final String imageName) { @Test void testStartContainerWithStrimziKafkaImage() { - assumeDocker(); - // explicitly set strimzi.test-container.kafka.custom.image String imageName = "quay.io/strimzi/kafka:0.27.1-kafka-3.0.0"; System.setProperty("strimzi.test-container.kafka.custom.image", imageName); @@ -184,8 +172,6 @@ void testStartContainerWithStrimziKafkaImage() { @Test void testStartContainerWithCustomImage() { - assumeDocker(); - String imageName = "quay.io/strimzi/kafka:0.27.1-kafka-3.0.0"; systemUnderTest = new StrimziKafkaContainer(imageName) .waitForRunning(); @@ -201,8 +187,6 @@ void testStartContainerWithCustomImage() { @Test void testStartContainerWithCustomNetwork() { - assumeDocker(); - Network network = Network.newNetwork(); systemUnderTest = new StrimziKafkaContainer() @@ -220,8 +204,6 @@ void testStartContainerWithCustomNetwork() { @Test void testUnsupportedKafkaVersion() { - assumeDocker(); - try { systemUnderTest = new StrimziKafkaContainer() .withKafkaVersion("2.4.0") @@ -236,8 +218,6 @@ void testUnsupportedKafkaVersion() { @ParameterizedTest(name = "testKafkaContainerConnectFromOutsideToInternalZooKeeper-{0}") @MethodSource("retrieveKafkaVersionsFile") void testKafkaContainerConnectFromOutsideToInternalZooKeeper() { - assumeDocker(); - try { systemUnderTest = new StrimziKafkaContainer() .waitForRunning(); @@ -267,8 +247,6 @@ void testKafkaContainerConnectFromOutsideToInternalZooKeeper() { @ParameterizedTest(name = "testKafkaContainerInternalCommunicationWithInternalZooKeeper-{0}") @MethodSource("retrieveKafkaVersionsFile") void testKafkaContainerInternalCommunicationWithInternalZooKeeper() throws IOException, InterruptedException { - assumeDocker(); - try { systemUnderTest = new StrimziKafkaContainer() .waitForRunning(); @@ -291,8 +269,6 @@ void testKafkaContainerInternalCommunicationWithInternalZooKeeper() throws IOExc @ParameterizedTest(name = "testIllegalStateUsingInternalZooKeeperWithKraft-{0}") @MethodSource("retrieveKafkaVersionsFile") void testIllegalStateUsingInternalZooKeeperWithKraft() { - assumeDocker(); - systemUnderTest = new StrimziKafkaContainer() .withKraft() .waitForRunning(); @@ -303,8 +279,6 @@ void testIllegalStateUsingInternalZooKeeperWithKraft() { @ParameterizedTest(name = "testIllegalStateUsingInternalZooKeeperWithExternalZooKeeper-{0}") @MethodSource("retrieveKafkaVersionsFile") void testIllegalStateUsingInternalZooKeeperWithExternalZooKeeper() { - assumeDocker(); - systemUnderTest = new StrimziKafkaContainer() // we do not need to spin-up instance of StrimziZooKeeperContainer .withExternalZookeeperConnect("zookeeper:2181") @@ -316,8 +290,6 @@ void testIllegalStateUsingInternalZooKeeperWithExternalZooKeeper() { @ParameterizedTest(name = "testStartBrokerWithProxyContainer-{0}") @MethodSource("retrieveKafkaVersionsFile") void testStartBrokerWithProxyContainer(final String imageName) { - assumeDocker(); - ToxiproxyContainer proxyContainer = new ToxiproxyContainer( DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.6.0") .asCompatibleSubstituteFor("shopify/toxiproxy")); diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java index e8acb47..b278d91 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java @@ -44,7 +44,6 @@ public class StrimziKafkaKraftContainerIT extends AbstractIT { @ParameterizedTest(name = "testStartContainerWithEmptyConfiguration-{0}") @MethodSource("retrieveKafkaVersionsFile") void testStartContainerWithEmptyConfiguration(final String imageName, final String kafkaVersion) throws ExecutionException, InterruptedException, TimeoutException { - assumeDocker(); supportsKraftMode(imageName); try { @@ -76,7 +75,6 @@ void testStartContainerWithEmptyConfiguration(final String imageName, final Stri @ParameterizedTest(name = "testStartContainerWithSomeConfiguration-{0}") @MethodSource("retrieveKafkaVersionsFile") void testStartContainerWithSomeConfiguration(final String imageName, final String kafkaVersion) throws ExecutionException, InterruptedException, TimeoutException { - assumeDocker(); supportsKraftMode(imageName); try { Map kafkaConfiguration = new HashMap<>(); @@ -115,8 +113,6 @@ void testStartContainerWithSomeConfiguration(final String imageName, final Strin @Test void testUnsupportedKRaftUsingKafkaVersion() { - assumeDocker(); - try { systemUnderTest = new StrimziKafkaContainer() .withKafkaVersion("2.8.2") @@ -133,8 +129,6 @@ void testUnsupportedKRaftUsingKafkaVersion() { @Test void testUnsupportedKRaftUsingImageName() { - assumeDocker(); - try { systemUnderTest = new StrimziKafkaContainer("quay.io/strimzi-test-container/test-container:latest-kafka-2.8.2") .withBrokerId(1) diff --git a/src/test/java/io/strimzi/test/container/StrimziZookeeperContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziZookeeperContainerIT.java index fe41cba..28411c2 100644 --- a/src/test/java/io/strimzi/test/container/StrimziZookeeperContainerIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziZookeeperContainerIT.java @@ -72,8 +72,6 @@ void testZookeeperWithKafkaContainer(final String imageName) { @ParameterizedTest(name = "testStartContainerWithZooKeeperProperties-{0}") @MethodSource("retrieveKafkaVersionsFile") void testStartContainerWithZooKeeperProperties(final String imageName) { - assumeDocker(); - try { systemUnderTest = new StrimziZookeeperContainer(imageName) .withZooKeeperPropertiesFile(MountableFile.forClasspathResource("zookeeper.properties")); From 6363f528ebe3d283e85bec815bba743cb34b5b22 Mon Sep 17 00:00:00 2001 From: see-quick Date: Mon, 14 Oct 2024 09:06:36 +0200 Subject: [PATCH 2/3] back network to final Signed-off-by: see-quick --- .../java/io/strimzi/test/container/StrimziKafkaCluster.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java index c401b60..5a35cfa 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java @@ -35,11 +35,10 @@ public class StrimziKafkaCluster implements KafkaContainer { // instance attributes private final int brokersNum; + private final Network network; private final StrimziZookeeperContainer zookeeper; private final Collection brokers; - private Network network; - /** * Constructor for @StrimziKafkaCluster class, which allows you to specify number of brokers @see{brokersNum}, * replication factor of internal topics @see{internalTopicReplicationFactor}, a map of additional Kafka From f1b8293aded40896c2bb3955365bb73978e66009 Mon Sep 17 00:00:00 2001 From: see-quick Date: Mon, 14 Oct 2024 09:07:32 +0200 Subject: [PATCH 3/3] checkstyle Signed-off-by: see-quick --- .../java/io/strimzi/test/container/StrimziKafkaClusterIT.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java index 0bf95e5..adecbad 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java @@ -26,7 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; import org.testcontainers.containers.ToxiproxyContainer; @@ -73,7 +72,7 @@ void testKafkaClusterStartupWithSharedNetwork() throws IOException, InterruptedE systemUnderTest.start(); verifyReadinessOfKafkaCluster(); - } finally { + } finally { systemUnderTest.stop(); } }