Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StrimziKafkaCluster builder #86

Merged
merged 5 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 95 additions & 70 deletions src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,54 +35,46 @@ public class StrimziKafkaCluster implements KafkaContainer {

// instance attributes
private final int brokersNum;
private final int internalTopicReplicationFactor;
private final Map<String, String> additionalKafkaConfiguration;
private final ToxiproxyContainer proxyContainer;
private final boolean enableSharedNetwork;

// not editable
private final Network network;
private final StrimziZookeeperContainer zookeeper;
private final Collection<KafkaContainer> brokers;

/**
* Constructor for @StrimziKafkaCluster class, which allows you to specify number of brokers @see{brokersNum},
* replication factor of internal topics @see{internalTopicReplicationFactor}, a map of additional Kafka
* configuration @see{additionalKafkaConfiguration} and a {@code proxyContainer}.
* <br><br>
* The {@code proxyContainer} allows to simulate network conditions (i.e. connection cut, latency).
* For example, you can simulate a network partition by cutting the connection of one or more brokers.
*
* @param brokersNum number of brokers
* @param internalTopicReplicationFactor internal topics
* @param additionalKafkaConfiguration additional Kafka configuration
* @param proxyContainer Proxy container
* @param enableSharedNetwork enable Kafka cluster to use a shared Docker network.
*/
public StrimziKafkaCluster(final int brokersNum,
final int internalTopicReplicationFactor,
final Map<String, String> additionalKafkaConfiguration,
final ToxiproxyContainer proxyContainer,
final boolean enableSharedNetwork) {
if (brokersNum <= 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
this.brokersNum = builder.brokersNum;
this.enableSharedNetwork = builder.enableSharedNetwork;
this.network = this.enableSharedNetwork ? Network.SHARED : Network.newNetwork();
this.internalTopicReplicationFactor = builder.internalTopicReplicationFactor == 0 ? this.brokersNum : builder.internalTopicReplicationFactor;
this.additionalKafkaConfiguration = builder.additionalKafkaConfiguration;
this.proxyContainer = builder.proxyContainer;

if (this.brokersNum <= 0) {
throw new IllegalArgumentException("brokersNum '" + this.brokersNum + "' must be greater than 0");
}
if (internalTopicReplicationFactor <= 0 || internalTopicReplicationFactor > brokersNum) {
throw new IllegalArgumentException("internalTopicReplicationFactor '" + internalTopicReplicationFactor + "' must be less than brokersNum and greater than 0");
if (this.internalTopicReplicationFactor <= 0 || this.internalTopicReplicationFactor > this.brokersNum) {
throw new IllegalArgumentException("internalTopicReplicationFactor '" + this.internalTopicReplicationFactor + "' must be less than brokersNum and greater than 0");
}

this.brokersNum = brokersNum;
this.network = enableSharedNetwork ? Network.SHARED : Network.newNetwork();

this.zookeeper = new StrimziZookeeperContainer()
.withNetwork(this.network);

Map<String, String> defaultKafkaConfigurationForMultiNode = new HashMap<>();
final Map<String, String> defaultKafkaConfigurationForMultiNode = new HashMap<>();
defaultKafkaConfigurationForMultiNode.put("offsets.topic.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("num.partitions", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(internalTopicReplicationFactor));

if (additionalKafkaConfiguration != null) {
defaultKafkaConfigurationForMultiNode.putAll(additionalKafkaConfiguration);
if (this.additionalKafkaConfiguration != null) {
defaultKafkaConfigurationForMultiNode.putAll(this.additionalKafkaConfiguration);
}

if (proxyContainer != null) {
proxyContainer.setNetwork(this.network);
if (this.proxyContainer != null) {
this.proxyContainer.setNetwork(this.network);
}

// multi-node set up
Expand All @@ -107,48 +99,81 @@ public StrimziKafkaCluster(final int brokersNum,
.collect(Collectors.toList());
}

/**
* Constructor for @StrimziKafkaCluster class, which allows you to specify number of brokers @see{brokersNum},
* replication factor of internal topics @see{internalTopicReplicationFactor} and map of additional Kafka
* configuration @see{additionalKafkaConfiguration}.
*
* @param brokersNum number of brokers
* @param internalTopicReplicationFactor internal topics
* @param additionalKafkaConfiguration additional Kafka configuration
*/
public StrimziKafkaCluster(final int brokersNum,
final int internalTopicReplicationFactor,
final Map<String, String> additionalKafkaConfiguration) {
this(brokersNum, internalTopicReplicationFactor, additionalKafkaConfiguration, null, false);
}

/**
* Constructor of StrimziKafkaCluster without specifying additional configuration.
*
* @param brokersNum number of brokers
*/
public StrimziKafkaCluster(final int brokersNum) {
this(brokersNum, brokersNum, null, null, false);
}
public static class StrimziKafkaClusterBuilder {
private int brokersNum;
private int internalTopicReplicationFactor;
private Map<String, String> additionalKafkaConfiguration = new HashMap<>();
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;

/**
* Constructor of StrimziKafkaCluster with proxy container
*
* @param brokersNum number of brokers to be deployed
* @param proxyContainer Proxy container
*/
public StrimziKafkaCluster(final int brokersNum, final ToxiproxyContainer proxyContainer) {
this(brokersNum, brokersNum, null, proxyContainer, false);
}
/**
* Sets the number of Kafka brokers in the cluster.
*
* @param brokersNum the number of Kafka brokers
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) {
this.brokersNum = brokersNum;
return this;
}

/**
* Constructor of StrimziKafkaCluster with proxy container
*
* @param brokersNum number of brokers to be deployed
* @param enableSharedNetwork enable Kafka cluster to use a shared Docker network.
*/
public StrimziKafkaCluster(final int brokersNum, final boolean enableSharedNetwork) {
this(brokersNum, brokersNum, null, null, enableSharedNetwork);
/**
* Sets the internal topic replication factor for Kafka brokers.
* If not provided, it defaults to the number of brokers.
*
* @param internalTopicReplicationFactor the replication factor for internal topics
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int internalTopicReplicationFactor) {
this.internalTopicReplicationFactor = internalTopicReplicationFactor;
return this;
}

/**
* Adds additional Kafka configuration parameters.
* These configurations are applied to all brokers in the cluster.
*
* @param additionalKafkaConfiguration a map of additional Kafka configuration options
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map<String, String> additionalKafkaConfiguration) {
if (additionalKafkaConfiguration != null) {
this.additionalKafkaConfiguration.putAll(additionalKafkaConfiguration);
}
return this;
}

/**
* Sets a {@code ToxiproxyContainer} to simulate network conditions such as latency or disconnection.
*
* @param proxyContainer the proxy container for simulating network conditions
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withProxyContainer(ToxiproxyContainer proxyContainer) {
this.proxyContainer = proxyContainer;
return this;
}

/**
* Enables a shared Docker network for the Kafka cluster.
* This allows the Kafka cluster to interact with other containers on the same network.
*
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withSharedNetwork() {
this.enableSharedNetwork = true;
return this;
}

/**
* Builds and returns a {@code StrimziKafkaCluster} instance based on the provided configurations.
*
* @return a new instance of {@code StrimziKafkaCluster}
*/
public StrimziKafkaCluster build() {
return new StrimziKafkaCluster(this);
}
}

/**
Expand Down
19 changes: 15 additions & 4 deletions src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ void testKafkaClusterStartup() throws IOException, InterruptedException {
@Test
void testKafkaClusterStartupWithSharedNetwork() throws IOException, InterruptedException {
try {
systemUnderTest = new StrimziKafkaCluster(3, true);
systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.withSharedNetwork()
.build();
systemUnderTest.start();

verifyReadinessOfKafkaCluster();
Expand All @@ -91,7 +94,10 @@ void testKafkaClusterFunctionality() throws ExecutionException, InterruptedExcep
@Test
void testKafkaClusterWithSharedNetworkFunctionality() throws ExecutionException, InterruptedException, TimeoutException {
try {
systemUnderTest = new StrimziKafkaCluster(NUMBER_OF_REPLICAS, true);
systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.withSharedNetwork()
.build();
systemUnderTest.start();

verifyFunctionalityOfKafkaCluster();
Expand All @@ -111,7 +117,10 @@ void testStartClusterWithProxyContainer() {
StrimziKafkaCluster kafkaCluster = null;

try {
kafkaCluster = new StrimziKafkaCluster(3, proxyContainer);
kafkaCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.withProxyContainer(proxyContainer)
.build();

kafkaCluster.start();
List<String> bootstrapUrls = new ArrayList<>();
Expand All @@ -130,7 +139,9 @@ void testStartClusterWithProxyContainer() {
}

private void setUpKafkaCluster() {
systemUnderTest = new StrimziKafkaCluster(NUMBER_OF_REPLICAS);
systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.build();
systemUnderTest.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
package io.strimzi.test.container;

import org.junit.jupiter.api.Test;
import org.testcontainers.containers.ToxiproxyContainer;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -13,27 +17,92 @@ public class StrimziKafkaClusterTest {

@Test
void testKafkaClusterNegativeOrZeroNumberOfNodes() {
assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster(
0, 1, null, null, false));
assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster(
-1, 1, null, null, false));
assertThrows(IllegalArgumentException.class, () ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(0)
.withInternalTopicReplicationFactor(1)
.build()
);
assertThrows(IllegalArgumentException.class, () ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(-1)
.withInternalTopicReplicationFactor(1)
.build()
);
}

@Test
void testKafkaClusterPossibleNumberOfNodes() {
assertDoesNotThrow(() -> new StrimziKafkaCluster(
1, 1, null, null, false));
assertDoesNotThrow(() -> new StrimziKafkaCluster(
3, 3, null, null, false));
assertDoesNotThrow(() -> new StrimziKafkaCluster(
10, 3, null, null, false));
assertDoesNotThrow(() ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(1)
.withInternalTopicReplicationFactor(1)
.build()
);
assertDoesNotThrow(() ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(3)
.withInternalTopicReplicationFactor(3)
.build()
);
assertDoesNotThrow(() ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(10)
.withInternalTopicReplicationFactor(3)
.build()
);
}

@Test
void testNegativeOrMoreReplicasThanAvailableOfKafkaBrokersInternalReplicationError() {
assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster(
0, 0, null, null, false));
assertThrows(IllegalArgumentException.class, () -> new StrimziKafkaCluster(
3, 5, null, null, false));
assertThrows(IllegalArgumentException.class, () ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(3)
.withInternalTopicReplicationFactor(5)
.build()
);
assertThrows(IllegalArgumentException.class, () ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(0)
.withInternalTopicReplicationFactor(0)
.build()
);
}

@Test
void testKafkaClusterWithProxyContainer() {
ToxiproxyContainer proxyContainer = new ToxiproxyContainer();
assertDoesNotThrow(() ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(3)
.withInternalTopicReplicationFactor(3)
.withProxyContainer(proxyContainer)
.build()
);
}

@Test
void testKafkaClusterWithSharedNetwork() {
assertDoesNotThrow(() ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(3)
.withInternalTopicReplicationFactor(3)
.withSharedNetwork()
.build()
);
}

@Test
void testKafkaClusterWithAdditionalConfiguration() {
Map<String, String> additionalConfig = new HashMap<>();
additionalConfig.put("log.retention.ms", "60000");

assertDoesNotThrow(() ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(3)
.withInternalTopicReplicationFactor(3)
.withAdditionalKafkaConfiguration(additionalConfig)
.build()
);
}
}
Loading