Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QQE-737 | Make Kafka use KRaft mode #1175

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9094

############################# Socket Server Settings #############################

Expand All @@ -29,19 +35,25 @@ broker.id=0
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=BROKER://0.0.0.0:9093,SSL://0.0.0.0:9092

listeners=BROKER://0.0.0.0:9093,SSL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094

# Name of listener used for communication between brokers.
inter.broker.listener.name=BROKER

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=SSL://localhost:${KAFKA_MAPPED_PORT},BROKER://localhost:9093
advertised.listeners=SSL://localhost:${KAFKA_MAPPED_PORT},BROKER://localhost:9093

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=BROKER:PLAINTEXT,SSL:SSL
listener.security.protocol.map=BROKER:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
Expand All @@ -58,10 +70,7 @@ socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

inter.broker.listener.name=BROKER

#### SSL ####

ssl.keystore.location=/opt/kafka/config/strimzi-custom-server-ssl-keystore.p12
ssl.keystore.password=top-secret
ssl.keystore.type=PKCS12
Expand All @@ -75,7 +84,7 @@ ssl.endpoint.identification.algorithm=
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
log.dirs=/tmp/kraft-combined-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
Expand Down Expand Up @@ -130,25 +139,3 @@ log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=45000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
import org.testcontainers.utility.MountableFile;

import io.quarkus.test.bootstrap.KafkaService;
import io.quarkus.test.logging.Log;
import io.quarkus.test.logging.TestContainersLoggingHandler;

public abstract class BaseKafkaContainerManagedResource extends DockerContainerManagedResource {

private static final String SERVER_PROPERTIES = "server.properties";
private static final String EXPECTED_LOG = ".*started \\(kafka.server.KafkaServer\\).*";
private static final String SERVER_PROPERTIES = "kraft/server.properties";
private static final String EXPECTED_LOG = ".*started .*kafka.server.Kafka.*Server.*";

protected final KafkaContainerManagedResourceBuilder model;

Expand Down Expand Up @@ -79,6 +80,7 @@ protected GenericContainer<?> initContainer() {

String kafkaConfigPath = model.getKafkaConfigPath();
if (StringUtils.isNotEmpty(getServerProperties())) {
Log.info("Copying file %s to %s ", getServerProperties(), kafkaConfigPath + SERVER_PROPERTIES);
kafkaContainer.withCopyFileToContainer(MountableFile.forClasspathResource(getServerProperties()),
kafkaConfigPath + SERVER_PROPERTIES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class OpenShiftStrimziKafkaContainerManagedResource implements ManagedRes
private static final String REGISTRY_DEPLOYMENT_TEMPLATE_PROPERTY_DEFAULT = "/registry-deployment-template.yml";
private static final String REGISTRY_DEPLOYMENT = "registry.yml";

private static final String EXPECTED_LOG = "started (kafka.server.KafkaServer)";
private static final String EXPECTED_LOG = "Kafka Server started";

private static final int HTTP_PORT = 9092;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.quarkus.test.services.Certificate.Format.PKCS12;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -56,37 +55,23 @@ public URILike getURI(Protocol protocol) {
return uri;
}

@Override
public void afterStart() {
super.afterStart();
if (model.getProtocol() == KafkaProtocol.SASL_SSL
&& (model.getServerProperties() == null || model.getServerProperties().isEmpty())) {
// make sure that client is added right after the start to Zookeeper
// see https://kafka.apache.org/documentation/#security_sasl_scram for more info
ExtendedStrimziKafkaContainer container = model.getContext().get(DOCKER_INNER_CONTAINER);
var command = ("/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config "
+ "'SCRAM-SHA-512=[password=%s]' --entity-type users --entity-name %s;")
.formatted(SASL_PASSWORD_VALUE, SASL_USERNAME_VALUE);
try {
var execResult = container.execInContainer("sh", "-c", command);
if (execResult.getExitCode() != 0) {
throw new IllegalStateException(
"Failed to add Kafka 'client' user to Zookeeper: " + execResult.getStderr());
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to add Kafka 'client' user to Zookeeper", e);
}
}
}

@Override
protected GenericContainer<?> initKafkaContainer() {
ExtendedStrimziKafkaContainer container = new ExtendedStrimziKafkaContainer(getKafkaImageName(), getKafkaVersion());
ExtendedStrimziKafkaContainer container = new ExtendedStrimziKafkaContainer(getKafkaImageName(), getKafkaVersion())
.enableKraftMode();
if (StringUtils.isNotEmpty(getServerProperties())) {
container.useCustomServerProperties();
}
container.withCreateContainerCmdModifier(cmd -> cmd.withName(DockerUtils.generateDockerContainerName()));

if (model.getProtocol() == KafkaProtocol.SASL_SSL
&& (model.getServerProperties() == null || model.getServerProperties().isEmpty())) {
/*
* make sure that client is added before the start
* https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html#kraft-based-
* clusters
*/
container.configureScram(SASL_USERNAME_VALUE, SASL_PASSWORD_VALUE);
}
return container;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
package io.quarkus.test.services.containers.strimzi;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import org.testcontainers.images.builder.Transferable;

import com.github.dockerjava.api.command.InspectContainerResponse;

import io.quarkus.test.services.containers.model.KafkaVendor;
import io.smallrye.mutiny.tuples.Tuple2;
import io.strimzi.test.container.StrimziKafkaContainer;

/**
* Extend the functionality of io.strimzi.StrimziKafkaContainer with:
* - Do not overwrite parameters of server.properties.
*
*/
public class ExtendedStrimziKafkaContainer extends StrimziKafkaContainer {

private static final String KAFKA_MAPPED_PORT = "${KAFKA_MAPPED_PORT}";
private static final int ALLOW_EXEC = 700;
private static final String TESTCONTAINERS_SCRIPT = "/testcontainers_start.sh";

private boolean useCustomServerProperties = false;
private Optional<Tuple2<String, String>> credentials = Optional.empty();

public ExtendedStrimziKafkaContainer(String name, String version) {
super(String.format("%s:%s", name, version));
Expand All @@ -32,17 +36,45 @@ public void useCustomServerProperties() {
@Override
protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
if (useCustomServerProperties) {
List<String> script = new ArrayList<>();
script.add("#!/bin/bash");
int kafkaExposedPort = this.getMappedPort(KafkaVendor.STRIMZI.getPort());

String command = "#!/bin/bash \n";
command = command + "sed 's/" + KAFKA_MAPPED_PORT + "/" + kafkaExposedPort + "/g' "
+ "config/server.properties > /tmp/effective_server.properties &\n";
command = command + "bin/zookeeper-server-start.sh config/zookeeper.properties &\n";
command = command + "bin/kafka-server-start.sh /tmp/effective_server.properties";
this.copyFileToContainer(Transferable.of(command.getBytes(StandardCharsets.UTF_8), ALLOW_EXEC),
"/testcontainers_start.sh");
script.add("sed 's/" + KAFKA_MAPPED_PORT + "/" + kafkaExposedPort + "/g' "
+ "config/kraft/server.properties > /tmp/effective_server.properties");
script.add("KAFKA_CLUSTER_ID=\"$(bin/kafka-storage.sh random-uuid)\"");
StringBuilder storageFormat = new StringBuilder()
.append("/opt/kafka/bin/kafka-storage.sh format")
.append(" -t ${KAFKA_CLUSTER_ID}")
.append(" -c /tmp/effective_server.properties");
credentials.ifPresent(credentials -> {
storageFormat.append(" --add-scram 'SCRAM-SHA-512=[name=%s,password=%s]'"
.formatted(credentials.getItem1(), credentials.getItem2()));
});
script.add(storageFormat.toString());
script.add("bin/kafka-server-start.sh /tmp/effective_server.properties");
this.copyFileToContainer(Transferable.of(String.join("\n", script), ALLOW_EXEC), TESTCONTAINERS_SCRIPT);
} else {
// we do not process credentials here, since SASL always used together with custom properties
// see StrimziKafkaContainerManagedResource#getServerProperties
super.containerIsStarting(containerInfo, reused);
// if that is to change, we will need to copy script from test containers, modify it and copy back again
}

}

/**
* The code below requires an explanation.
* StrimziKafkaContainer has a special method which makes it use kraft mode (without a zookeeper)
* Container quay.io/strimzi/kafka requires for broker.id and node.id to have the same value in kraft mode,
* and for some reason strimzi class always overwrites broker id (to 0 by default)
* since config/kraft/server.properties contains node.id=1, we have to use this value
*/
public ExtendedStrimziKafkaContainer enableKraftMode() {
return (ExtendedStrimziKafkaContainer) super.withKraft()
.withBrokerId(1);
}

public void configureScram(String name, String password) {
credentials = Optional.of(Tuple2.of(name, password));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9094

############################# Socket Server Settings #############################

Expand All @@ -29,19 +35,25 @@ broker.id=0
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=BROKER://0.0.0.0:9093,SASL_SSL://0.0.0.0:9092

listeners=BROKER://0.0.0.0:9093,SASL_SSL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094

# Name of listener used for communication between brokers.
inter.broker.listener.name=BROKER

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=SASL_SSL://localhost:${KAFKA_MAPPED_PORT},BROKER://localhost:9093

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=BROKER:PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=BROKER:PLAINTEXT,SASL_SSL:SASL_SSL,CONTROLLER:PLAINTEXT

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
Expand All @@ -58,9 +70,6 @@ socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


inter.broker.listener.name=BROKER

############################# SASL_SSL Settings #############################

sasl.enabled.mechanisms=SCRAM-SHA-512
Expand All @@ -86,7 +95,7 @@ ssl.client.auth=required
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
log.dirs=/tmp/kraft-combined-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
Expand Down Expand Up @@ -141,25 +150,3 @@ log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=45000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
Loading