diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index 691c4f10bf8..13f8cf3b0a8 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -34,8 +34,7 @@ If for some reason you want to use an externally running Zookeeper, then just pa ### Using Kraft mode -The self-managed (Kraft) mode is available as a preview feature since version 3.0 (confluentinc/cp-kafka:7.0.x) and -declared as a production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x). +KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)" [Kraft mode](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKraftMode diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java index 60790514825..089d80fa080 100644 --- a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java @@ -51,6 +51,18 @@ public void testKafkaContainerKraftCluster() throws Exception { } } + @Test + public void testKafkaContainerKraftClusterAfterConfluentPlatform740() throws Exception { + try (KafkaContainerKraftCluster cluster = new KafkaContainerKraftCluster("7.4.0", 3, 2)) { + cluster.start(); + String bootstrapServers = cluster.getBootstrapServers(); + + assertThat(cluster.getBrokers()).hasSize(3); + + testKafkaFunctionality(bootstrapServers, 3, 2); + } + } + protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception { try ( AdminClient adminClient = AdminClient.create( diff --git a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java index 32bfc366896..d00b15a1030 100644 --- a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java @@ -6,11 +6,10 @@ import org.testcontainers.utility.ComparableVersion; import org.testcontainers.utility.DockerImageName; -import java.io.IOException; +import java.util.Objects; /** * This container wraps Confluent Kafka and Zookeeper (optionally) - * */ public class KafkaContainer extends GenericContainer { @@ -29,11 +28,13 @@ public class KafkaContainer extends GenericContainer { // https://docs.confluent.io/platform/7.0.0/release-notes/index.html#ak-raft-kraft private static final String MIN_KRAFT_TAG = "7.0.0"; + public static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw"; + protected String externalZookeeperConnect = null; private boolean kraftEnabled = false; - private String clusterId; + private String clusterId = DEFAULT_CLUSTER_ID; /** * @deprecated use {@link KafkaContainer(DockerImageName)} instead @@ -98,7 +99,7 @@ public KafkaContainer withKraft() { throw new IllegalStateException("Cannot configure Kraft mode when Zookeeper configured"); } verifyMinKraftVersion(); - kraftEnabled = true; + this.kraftEnabled = true; return self(); } @@ -115,7 +116,13 @@ private void verifyMinKraftVersion() { } } + private boolean isLessThanCP740() { + String actualVersion = DockerImageName.parse(getDockerImageName()).getVersionPart(); + return new ComparableVersion(actualVersion).isLessThan("7.4.0"); + } + public KafkaContainer withClusterId(String clusterId) { + Objects.requireNonNull(clusterId, "clusterId cannot be null"); this.clusterId = clusterId; return self(); } @@ -126,7 +133,7 @@ public String getBootstrapServers() { @Override protected void configure() { - if (kraftEnabled) { + if (this.kraftEnabled) { waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1)); configureKraft(); } else { @@ -136,10 +143,9 @@ protected void configure() { } protected void configureKraft() { - withEnv( - "KAFKA_NODE_ID", - getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID")) - ); + //CP 7.4.0 + getEnvMap().computeIfAbsent("CLUSTER_ID", key -> clusterId); + getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID")); withEnv( "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", String.format("%s,CONTROLLER:PLAINTEXT", getEnvMap().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP")) @@ -147,20 +153,17 @@ protected void configureKraft() { withEnv("KAFKA_LISTENERS", String.format("%s,CONTROLLER://0.0.0.0:9094", getEnvMap().get("KAFKA_LISTENERS"))); withEnv("KAFKA_PROCESS_ROLES", "broker,controller"); - withEnv( - "KAFKA_CONTROLLER_QUORUM_VOTERS", - getEnvMap() - .computeIfAbsent( - "KAFKA_CONTROLLER_QUORUM_VOTERS", - key -> { - return String.format( - "%s@%s:9094", - getEnvMap().get("KAFKA_NODE_ID"), - getNetwork() != null ? getNetworkAliases().get(0) : "localhost" - ); - } - ) - ); + getEnvMap() + .computeIfAbsent( + "KAFKA_CONTROLLER_QUORUM_VOTERS", + key -> { + return String.format( + "%s@%s:9094", + getEnvMap().get("KAFKA_NODE_ID"), + getNetwork() != null ? getNetworkAliases().get(0) : "localhost" + ); + } + ); withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER"); } @@ -186,10 +189,18 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { brokerAdvertisedListener(containerInfo) ); - command += (kraftEnabled) ? commandKraft() : commandZookeeper(); + if (this.kraftEnabled && isLessThanCP740()) { + // Optimization: skip the checks + command += "echo '' > /etc/confluent/docker/ensure \n"; + command += commandKraft(); + } + + if (!this.kraftEnabled) { + // Optimization: skip the checks + command += "echo '' > /etc/confluent/docker/ensure \n"; + command += commandZookeeper(); + } - // Optimization: skip the checks - command += "echo '' > /etc/confluent/docker/ensure \n"; // Run the original command command += "/etc/confluent/docker/run \n"; copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT); @@ -197,16 +208,9 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { protected String commandKraft() { String command = "sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure\n"; - try { - if (clusterId == null) { - clusterId = execInContainer("kafka-storage", "random-uuid").getStdout().trim(); - } - } catch (IOException | InterruptedException e) { - logger().error("Failed to execute `kafka-storage random-uuid`. Exception message: {}", e.getMessage()); - } command += "echo 'kafka-storage format --ignore-formatted -t \"" + - clusterId + + this.clusterId + "\" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure\n"; return command; } diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java index bad3c8d89ef..860c990cb32 100644 --- a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java @@ -133,10 +133,20 @@ public void testWithHostExposedPortAndExternalNetwork() throws Exception { } @Test - public void testUsageKraft() throws Exception { + public void testUsageKraftBeforeConfluentPlatformVersion74() throws Exception { try ( - // withKraftMode { KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.1")).withKraft() + ) { + kafka.start(); + testKafkaFunctionality(kafka.getBootstrapServers()); + } + } + + @Test + public void testUsageKraftAfterConfluentPlatformVersion74() throws Exception { + try ( + // withKraftMode { + KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft() // } ) { kafka.start();