Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to enable shared network in StrimziKafkaCluster #85

Merged
merged 3 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ public class StrimziKafkaCluster implements KafkaContainer {
* @param internalTopicReplicationFactor internal topics
* @param additionalKafkaConfiguration additional Kafka configuration
* @param proxyContainer Proxy container
* @param enableSharedNetwork enable Kafka cluster to use a shared Docker network.
*/
public StrimziKafkaCluster(final int brokersNum,
final int internalTopicReplicationFactor,
final Map<String, String> additionalKafkaConfiguration,
final ToxiproxyContainer proxyContainer) {
final ToxiproxyContainer proxyContainer,
final boolean enableSharedNetwork) {
if (brokersNum <= 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
Expand All @@ -64,7 +66,7 @@ public StrimziKafkaCluster(final int brokersNum,
}

this.brokersNum = brokersNum;
this.network = Network.newNetwork();
this.network = enableSharedNetwork ? Network.SHARED : Network.newNetwork();

this.zookeeper = new StrimziZookeeperContainer()
.withNetwork(this.network);
Expand Down Expand Up @@ -117,7 +119,7 @@ public StrimziKafkaCluster(final int brokersNum,
public StrimziKafkaCluster(final int brokersNum,
final int internalTopicReplicationFactor,
final Map<String, String> additionalKafkaConfiguration) {
this(brokersNum, internalTopicReplicationFactor, additionalKafkaConfiguration, null);
this(brokersNum, internalTopicReplicationFactor, additionalKafkaConfiguration, null, false);
}

/**
Expand All @@ -126,7 +128,7 @@ public StrimziKafkaCluster(final int brokersNum,
* @param brokersNum number of brokers
*/
public StrimziKafkaCluster(final int brokersNum) {
this(brokersNum, brokersNum, null, null);
this(brokersNum, brokersNum, null, null, false);
}

/**
Expand All @@ -136,7 +138,17 @@ public StrimziKafkaCluster(final int brokersNum) {
* @param proxyContainer Proxy container
*/
public StrimziKafkaCluster(final int brokersNum, final ToxiproxyContainer proxyContainer) {
this(brokersNum, brokersNum, null, proxyContainer);
this(brokersNum, brokersNum, null, proxyContainer, false);
}

/**
* Constructor of StrimziKafkaCluster with proxy container
*
* @param brokersNum number of brokers to be deployed
* @param enableSharedNetwork enable Kafka cluster to use a shared Docker network.
*/
public StrimziKafkaCluster(final int brokersNum, final boolean enableSharedNetwork) {
this(brokersNum, brokersNum, null, null, enableSharedNetwork);
}

/**
Expand Down
6 changes: 6 additions & 0 deletions src/test/java/io/strimzi/test/container/AbstractIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.stream.Stream;

import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.provider.Arguments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -50,4 +51,9 @@ protected void supportsKraftMode(final String imageName) {
protected boolean isLessThanKafka350(final String kafkaVersion) {
return KafkaVersionService.KafkaVersion.compareVersions(kafkaVersion, "3.5.0") == -1;
}

@BeforeEach
void setUpEach() {
assumeDocker();
}
}
118 changes: 82 additions & 36 deletions src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.ToxiproxyContainer;
Expand All @@ -51,12 +50,91 @@
public class StrimziKafkaClusterIT extends AbstractIT {

private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaContainerIT.class);
private static final int NUMBER_OF_REPLICAS = 3;

private StrimziKafkaCluster systemUnderTest;
private int numberOfReplicas;

@Test
void testKafkaClusterStartup() throws IOException, InterruptedException {
try {
setUpKafkaCluster();

verifyReadinessOfKafkaCluster();
} finally {
systemUnderTest.stop();
}
}

@Test
void testKafkaClusterStartupWithSharedNetwork() throws IOException, InterruptedException {
try {
systemUnderTest = new StrimziKafkaCluster(3, true);
systemUnderTest.start();

verifyReadinessOfKafkaCluster();
} finally {
systemUnderTest.stop();
}
}

@Test
void testKafkaClusterFunctionality() throws ExecutionException, InterruptedException, TimeoutException {
setUpKafkaCluster();

try {
verifyFunctionalityOfKafkaCluster();
} finally {
systemUnderTest.stop();
}
}

@Test
void testKafkaClusterWithSharedNetworkFunctionality() throws ExecutionException, InterruptedException, TimeoutException {
try {
systemUnderTest = new StrimziKafkaCluster(NUMBER_OF_REPLICAS, true);
systemUnderTest.start();

verifyFunctionalityOfKafkaCluster();
} finally {
systemUnderTest.stop();
}
}

@Test
void testStartClusterWithProxyContainer() {
setUpKafkaCluster();

ToxiproxyContainer proxyContainer = new ToxiproxyContainer(
DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.6.0")
.asCompatibleSubstituteFor("shopify/toxiproxy"));
Frawless marked this conversation as resolved.
Show resolved Hide resolved

StrimziKafkaCluster kafkaCluster = null;

try {
kafkaCluster = new StrimziKafkaCluster(3, proxyContainer);

kafkaCluster.start();
List<String> bootstrapUrls = new ArrayList<>();
for (KafkaContainer kafkaContainer : kafkaCluster.getBrokers()) {
Proxy proxy = ((StrimziKafkaContainer) kafkaContainer).getProxy();
assertThat(proxy, notNullValue());
bootstrapUrls.add(kafkaContainer.getBootstrapServers());
}

assertThat(kafkaCluster.getBootstrapServers(),
is(bootstrapUrls.stream().collect(Collectors.joining(","))));
} finally {
kafkaCluster.stop();
systemUnderTest.stop();
}
}

private void setUpKafkaCluster() {
systemUnderTest = new StrimziKafkaCluster(NUMBER_OF_REPLICAS);
systemUnderTest.start();
}

private void verifyReadinessOfKafkaCluster() throws IOException, InterruptedException {
// exercise (fetch the data)
final Container.ExecResult result = this.systemUnderTest.getZookeeper().execInContainer(
"sh", "-c",
Expand All @@ -72,8 +150,7 @@ void testKafkaClusterStartup() throws IOException, InterruptedException {
LOGGER.info("Brokers are {}", systemUnderTest.getBootstrapServers());
}

@Test
void testKafkaClusterFunctionality() throws InterruptedException, ExecutionException, TimeoutException {
private void verifyFunctionalityOfKafkaCluster() throws ExecutionException, InterruptedException, TimeoutException {
// using try-with-resources for AdminClient, KafkaProducer and KafkaConsumer (implicit closing connection)
try (final AdminClient adminClient = AdminClient.create(ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers()));
Expand All @@ -99,7 +176,7 @@ void testKafkaClusterFunctionality() throws InterruptedException, ExecutionExcep
final String recordKey = "strimzi";
final String recordValue = "the-best-project-in-the-world";

final Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, numberOfReplicas, (short) numberOfReplicas));
final Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, NUMBER_OF_REPLICAS, (short) NUMBER_OF_REPLICAS));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(topicName));
Expand Down Expand Up @@ -129,37 +206,6 @@ void testKafkaClusterFunctionality() throws InterruptedException, ExecutionExcep
}
}

@Test
void testStartClusterWithProxyContainer() {
ToxiproxyContainer proxyContainer = new ToxiproxyContainer(
DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.6.0")
.asCompatibleSubstituteFor("shopify/toxiproxy"));

StrimziKafkaCluster kafkaCluster = new StrimziKafkaCluster(3, proxyContainer);
kafkaCluster.start();

List<String> bootstrapUrls = new ArrayList<>();
for (KafkaContainer kafkaContainer : kafkaCluster.getBrokers()) {
Proxy proxy = ((StrimziKafkaContainer) kafkaContainer).getProxy();
assertThat(proxy, notNullValue());
bootstrapUrls.add(kafkaContainer.getBootstrapServers());
}

assertThat(kafkaCluster.getBootstrapServers(),
is(bootstrapUrls.stream().collect(Collectors.joining(","))));

kafkaCluster.stop();
}

@BeforeEach
void setUp() {
final int numberOfBrokers = 3;
numberOfReplicas = numberOfBrokers;

systemUnderTest = new StrimziKafkaCluster(numberOfBrokers);
systemUnderTest.start();
}

@AfterEach
void tearDown() {
systemUnderTest.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,26 @@ public class StrimziKafkaClusterTest {
@Test
void testKafkaClusterNegativeOrZeroNumberOfNodes() {
assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster(
0, 1, null, null));
0, 1, null, null, false));
assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster(
-1, 1, null, null));
-1, 1, null, null, false));
}

@Test
void testKafkaClusterPossibleNumberOfNodes() {
assertDoesNotThrow(() -> new StrimziKafkaCluster(
1, 1, null, null));
1, 1, null, null, false));
assertDoesNotThrow(() -> new StrimziKafkaCluster(
3, 3, null, null));
3, 3, null, null, false));
assertDoesNotThrow(() -> new StrimziKafkaCluster(
10, 3, null, null));
10, 3, null, null, false));
}

@Test
void testNegativeOrMoreReplicasThanAvailableOfKafkaBrokersInternalReplicationError() {
assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster(
0, 0, null, null));
0, 0, null, null, false));
assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster(
3, 5, null, null));
3, 5, null, null, false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public class StrimziKafkaContainerIT extends AbstractIT {
@ParameterizedTest(name = "testStartContainerWithEmptyConfiguration-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testStartContainerWithEmptyConfiguration(final String imageName) {
assumeDocker();

try (StrimziKafkaContainer systemUnderTest = new StrimziKafkaContainer(imageName)
.withBrokerId(1)
.waitForRunning()) {
Expand All @@ -79,8 +77,6 @@ void testStartContainerWithEmptyConfiguration(final String imageName) {
@ParameterizedTest(name = "testStartContainerWithSomeConfiguration-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testStartContainerWithSomeConfiguration(final String imageName) {
assumeDocker();

Map<String, String> kafkaConfiguration = new HashMap<>();

kafkaConfiguration.put("log.cleaner.enable", "false");
Expand Down Expand Up @@ -109,8 +105,6 @@ void testStartContainerWithSomeConfiguration(final String imageName) {
@ParameterizedTest(name = "testStartContainerWithFixedExposedPort-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testStartContainerWithFixedExposedPort(final String imageName) {
assumeDocker();

systemUnderTest = new StrimziKafkaContainer(imageName)
.withPort(9092)
.waitForRunning();
Expand All @@ -125,8 +119,6 @@ void testStartContainerWithFixedExposedPort(final String imageName) {
@ParameterizedTest(name = "testStartContainerWithSSLBootstrapServers-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testStartContainerWithSSLBootstrapServers(final String imageName) {
assumeDocker();

systemUnderTest = new StrimziKafkaContainer(imageName)
.waitForRunning()
.withBootstrapServers(c -> String.format("SSL://%s:%s", c.getHost(), c.getMappedPort(9092)));
Expand All @@ -141,8 +133,6 @@ void testStartContainerWithSSLBootstrapServers(final String imageName) {
@ParameterizedTest(name = "testStartContainerWithServerProperties-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testStartContainerWithServerProperties(final String imageName) {
assumeDocker();

systemUnderTest = new StrimziKafkaContainer(imageName)
.waitForRunning()
.withServerProperties(MountableFile.forClasspathResource("server.properties"));
Expand All @@ -161,8 +151,6 @@ void testStartContainerWithServerProperties(final String imageName) {

@Test
void testStartContainerWithStrimziKafkaImage() {
assumeDocker();

// explicitly set strimzi.test-container.kafka.custom.image
String imageName = "quay.io/strimzi/kafka:0.27.1-kafka-3.0.0";
System.setProperty("strimzi.test-container.kafka.custom.image", imageName);
Expand All @@ -184,8 +172,6 @@ void testStartContainerWithStrimziKafkaImage() {

@Test
void testStartContainerWithCustomImage() {
assumeDocker();

String imageName = "quay.io/strimzi/kafka:0.27.1-kafka-3.0.0";
systemUnderTest = new StrimziKafkaContainer(imageName)
.waitForRunning();
Expand All @@ -201,8 +187,6 @@ void testStartContainerWithCustomImage() {

@Test
void testStartContainerWithCustomNetwork() {
assumeDocker();

Network network = Network.newNetwork();

systemUnderTest = new StrimziKafkaContainer()
Expand All @@ -220,8 +204,6 @@ void testStartContainerWithCustomNetwork() {

@Test
void testUnsupportedKafkaVersion() {
assumeDocker();

try {
systemUnderTest = new StrimziKafkaContainer()
.withKafkaVersion("2.4.0")
Expand All @@ -236,8 +218,6 @@ void testUnsupportedKafkaVersion() {
@ParameterizedTest(name = "testKafkaContainerConnectFromOutsideToInternalZooKeeper-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testKafkaContainerConnectFromOutsideToInternalZooKeeper() {
assumeDocker();

try {
systemUnderTest = new StrimziKafkaContainer()
.waitForRunning();
Expand Down Expand Up @@ -267,8 +247,6 @@ void testKafkaContainerConnectFromOutsideToInternalZooKeeper() {
@ParameterizedTest(name = "testKafkaContainerInternalCommunicationWithInternalZooKeeper-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testKafkaContainerInternalCommunicationWithInternalZooKeeper() throws IOException, InterruptedException {
assumeDocker();

try {
systemUnderTest = new StrimziKafkaContainer()
.waitForRunning();
Expand All @@ -291,8 +269,6 @@ void testKafkaContainerInternalCommunicationWithInternalZooKeeper() throws IOExc
@ParameterizedTest(name = "testIllegalStateUsingInternalZooKeeperWithKraft-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testIllegalStateUsingInternalZooKeeperWithKraft() {
assumeDocker();

systemUnderTest = new StrimziKafkaContainer()
.withKraft()
.waitForRunning();
Expand All @@ -303,8 +279,6 @@ void testIllegalStateUsingInternalZooKeeperWithKraft() {
@ParameterizedTest(name = "testIllegalStateUsingInternalZooKeeperWithExternalZooKeeper-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testIllegalStateUsingInternalZooKeeperWithExternalZooKeeper() {
assumeDocker();

systemUnderTest = new StrimziKafkaContainer()
// we do not need to spin-up instance of StrimziZooKeeperContainer
.withExternalZookeeperConnect("zookeeper:2181")
Expand All @@ -316,8 +290,6 @@ void testIllegalStateUsingInternalZooKeeperWithExternalZooKeeper() {
@ParameterizedTest(name = "testStartBrokerWithProxyContainer-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testStartBrokerWithProxyContainer(final String imageName) {
assumeDocker();

ToxiproxyContainer proxyContainer = new ToxiproxyContainer(
DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.6.0")
.asCompatibleSubstituteFor("shopify/toxiproxy"));
Expand Down
Loading
Loading