Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StrimziKafkaCluster builder #86

Merged
merged 5 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 161 additions & 39 deletions src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,26 @@
* A multi-node instance of Kafka and Zookeeper using the latest image from quay.io/strimzi/kafka with the given version.
* It perfectly fits for integration/system testing. We always deploy one zookeeper with a specified number of Kafka instances,
* running as a separate container inside Docker. The additional configuration for Kafka brokers can be passed to the constructor.
* <br><br>
* Note: Direct constructor calls are deprecated and will be removed in the next released version.
* Please use {@link StrimziKafkaClusterBuilder} for creating instances of this class.
*/
public class StrimziKafkaCluster implements KafkaContainer {

// class attributes
private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaCluster.class);

// instance attributes
private final int brokersNum;
private int brokersNum;
private int internalTopicReplicationFactor;
private Map<String, String> additionalKafkaConfiguration;
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;

// not editable
private final Network network;
private final StrimziZookeeperContainer zookeeper;
private final Collection<KafkaContainer> brokers;
private Collection<KafkaContainer> brokers;

/**
* Constructor for @StrimziKafkaCluster class, which allows you to specify number of brokers @see{brokersNum},
Expand All @@ -53,58 +62,25 @@ public class StrimziKafkaCluster implements KafkaContainer {
* @param proxyContainer Proxy container
* @param enableSharedNetwork enable Kafka cluster to use a shared Docker network.
*/
@Deprecated
public StrimziKafkaCluster(final int brokersNum,
final int internalTopicReplicationFactor,
final Map<String, String> additionalKafkaConfiguration,
final ToxiproxyContainer proxyContainer,
final boolean enableSharedNetwork) {
if (brokersNum <= 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
if (internalTopicReplicationFactor <= 0 || internalTopicReplicationFactor > brokersNum) {
throw new IllegalArgumentException("internalTopicReplicationFactor '" + internalTopicReplicationFactor + "' must be less than brokersNum and greater than 0");
}
validateBrokerNum(brokersNum);
validateInternalTopicReplicationFactor(internalTopicReplicationFactor);

this.brokersNum = brokersNum;
this.network = enableSharedNetwork ? Network.SHARED : Network.newNetwork();

this.zookeeper = new StrimziZookeeperContainer()
.withNetwork(this.network);

Map<String, String> defaultKafkaConfigurationForMultiNode = new HashMap<>();
defaultKafkaConfigurationForMultiNode.put("offsets.topic.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("num.partitions", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(internalTopicReplicationFactor));

if (additionalKafkaConfiguration != null) {
defaultKafkaConfigurationForMultiNode.putAll(additionalKafkaConfiguration);
}

if (proxyContainer != null) {
proxyContainer.setNetwork(this.network);
}

// multi-node set up
this.brokers = IntStream
.range(0, this.brokersNum)
.mapToObj(brokerId -> {
LOGGER.info("Starting broker with id {}", brokerId);
// adding broker id for each kafka container
KafkaContainer kafkaContainer = new StrimziKafkaContainer()
.withBrokerId(brokerId)
.withKafkaConfigurationMap(defaultKafkaConfigurationForMultiNode)
.withExternalZookeeperConnect("zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT)
.withNetwork(this.network)
.withProxyContainer(proxyContainer)
.withNetworkAliases("broker-" + brokerId)
.dependsOn(this.zookeeper);

LOGGER.info("Started broker with id: {}", kafkaContainer);

return kafkaContainer;
})
.collect(Collectors.toList());
prepareKafkaCluster(additionalKafkaConfiguration);
}

/**
Expand All @@ -116,6 +92,7 @@ public StrimziKafkaCluster(final int brokersNum,
* @param internalTopicReplicationFactor internal topics
* @param additionalKafkaConfiguration additional Kafka configuration
*/
@Deprecated
public StrimziKafkaCluster(final int brokersNum,
final int internalTopicReplicationFactor,
final Map<String, String> additionalKafkaConfiguration) {
Expand All @@ -127,6 +104,7 @@ public StrimziKafkaCluster(final int brokersNum,
*
* @param brokersNum number of brokers
*/
@Deprecated
public StrimziKafkaCluster(final int brokersNum) {
this(brokersNum, brokersNum, null, null, false);
}
Expand All @@ -137,6 +115,7 @@ public StrimziKafkaCluster(final int brokersNum) {
* @param brokersNum number of brokers to be deployed
* @param proxyContainer Proxy container
*/
@Deprecated
public StrimziKafkaCluster(final int brokersNum, final ToxiproxyContainer proxyContainer) {
this(brokersNum, brokersNum, null, proxyContainer, false);
}
Expand All @@ -147,10 +126,153 @@ public StrimziKafkaCluster(final int brokersNum, final ToxiproxyContainer proxyC
* @param brokersNum number of brokers to be deployed
* @param enableSharedNetwork enable Kafka cluster to use a shared Docker network.
*/
@Deprecated
public StrimziKafkaCluster(final int brokersNum, final boolean enableSharedNetwork) {
this(brokersNum, brokersNum, null, null, enableSharedNetwork);
}

private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
this.brokersNum = builder.brokersNum;
this.enableSharedNetwork = builder.enableSharedNetwork;
this.network = this.enableSharedNetwork ? Network.SHARED : Network.newNetwork();
this.internalTopicReplicationFactor = builder.internalTopicReplicationFactor == 0 ? this.brokersNum : builder.internalTopicReplicationFactor;
this.additionalKafkaConfiguration = builder.additionalKafkaConfiguration;
this.proxyContainer = builder.proxyContainer;

validateBrokerNum(this.brokersNum);
validateInternalTopicReplicationFactor(this.internalTopicReplicationFactor);

this.zookeeper = new StrimziZookeeperContainer()
.withNetwork(this.network);

if (this.proxyContainer != null) {
this.proxyContainer.setNetwork(this.network);
}

prepareKafkaCluster(this.additionalKafkaConfiguration);
}

private void prepareKafkaCluster(final Map<String, String> additionalKafkaConfiguration) {
final Map<String, String> defaultKafkaConfigurationForMultiNode = new HashMap<>();
defaultKafkaConfigurationForMultiNode.put("offsets.topic.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("num.partitions", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(internalTopicReplicationFactor));

if (additionalKafkaConfiguration != null) {
defaultKafkaConfigurationForMultiNode.putAll(additionalKafkaConfiguration);
}

// multi-node set up
this.brokers = IntStream
.range(0, this.brokersNum)
.mapToObj(brokerId -> {
LOGGER.info("Starting broker with id {}", brokerId);
// adding broker id for each kafka container
KafkaContainer kafkaContainer = new StrimziKafkaContainer()
.withBrokerId(brokerId)
.withKafkaConfigurationMap(defaultKafkaConfigurationForMultiNode)
.withExternalZookeeperConnect("zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT)
.withNetwork(this.network)
.withProxyContainer(proxyContainer)
.withNetworkAliases("broker-" + brokerId)
.dependsOn(this.zookeeper);

LOGGER.info("Started broker with id: {}", kafkaContainer);

return kafkaContainer;
})
.collect(Collectors.toList());
}

private void validateBrokerNum(int brokersNum) {
if (brokersNum <= 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
}

private void validateInternalTopicReplicationFactor(int internalTopicReplicationFactor) {
if (internalTopicReplicationFactor <= 0 || internalTopicReplicationFactor > this.brokersNum) {
throw new IllegalArgumentException("internalTopicReplicationFactor '" + internalTopicReplicationFactor + "' must be less than brokersNum and greater than 0");
}
}

public static class StrimziKafkaClusterBuilder {
private int brokersNum;
private int internalTopicReplicationFactor;
private Map<String, String> additionalKafkaConfiguration = new HashMap<>();
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;

/**
* Sets the number of Kafka brokers in the cluster.
*
* @param brokersNum the number of Kafka brokers
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) {
this.brokersNum = brokersNum;
return this;
}

/**
* Sets the internal topic replication factor for Kafka brokers.
* If not provided, it defaults to the number of brokers.
*
* @param internalTopicReplicationFactor the replication factor for internal topics
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int internalTopicReplicationFactor) {
this.internalTopicReplicationFactor = internalTopicReplicationFactor;
return this;
}

/**
* Adds additional Kafka configuration parameters.
* These configurations are applied to all brokers in the cluster.
*
* @param additionalKafkaConfiguration a map of additional Kafka configuration options
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map<String, String> additionalKafkaConfiguration) {
if (additionalKafkaConfiguration != null) {
this.additionalKafkaConfiguration.putAll(additionalKafkaConfiguration);
}
return this;
}

/**
* Sets a {@code ToxiproxyContainer} to simulate network conditions such as latency or disconnection.
*
* @param proxyContainer the proxy container for simulating network conditions
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withProxyContainer(ToxiproxyContainer proxyContainer) {
this.proxyContainer = proxyContainer;
return this;
}

/**
* Enables a shared Docker network for the Kafka cluster.
* This allows the Kafka cluster to interact with other containers on the same network.
*
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withSharedNetwork() {
this.enableSharedNetwork = true;
return this;
}

/**
* Builds and returns a {@code StrimziKafkaCluster} instance based on the provided configurations.
*
* @return a new instance of {@code StrimziKafkaCluster}
*/
public StrimziKafkaCluster build() {
return new StrimziKafkaCluster(this);
}
}

/**
* Get collection of Strimzi kafka containers
* @return collection of Strimzi kafka containers
Expand Down
19 changes: 15 additions & 4 deletions src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ void testKafkaClusterStartup() throws IOException, InterruptedException {
@Test
void testKafkaClusterStartupWithSharedNetwork() throws IOException, InterruptedException {
try {
systemUnderTest = new StrimziKafkaCluster(3, true);
systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.withSharedNetwork()
.build();
systemUnderTest.start();

verifyReadinessOfKafkaCluster();
Expand All @@ -91,7 +94,10 @@ void testKafkaClusterFunctionality() throws ExecutionException, InterruptedExcep
@Test
void testKafkaClusterWithSharedNetworkFunctionality() throws ExecutionException, InterruptedException, TimeoutException {
try {
systemUnderTest = new StrimziKafkaCluster(NUMBER_OF_REPLICAS, true);
systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.withSharedNetwork()
.build();
systemUnderTest.start();

verifyFunctionalityOfKafkaCluster();
Expand All @@ -111,7 +117,10 @@ void testStartClusterWithProxyContainer() {
StrimziKafkaCluster kafkaCluster = null;

try {
kafkaCluster = new StrimziKafkaCluster(3, proxyContainer);
kafkaCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.withProxyContainer(proxyContainer)
.build();

kafkaCluster.start();
List<String> bootstrapUrls = new ArrayList<>();
Expand All @@ -130,7 +139,9 @@ void testStartClusterWithProxyContainer() {
}

private void setUpKafkaCluster() {
systemUnderTest = new StrimziKafkaCluster(NUMBER_OF_REPLICAS);
systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.build();
systemUnderTest.start();
}

Expand Down
Loading
Loading