Skip to content

Commit

Permalink
Upgrade Strimzi version to 2.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Sgitario committed May 10, 2021
1 parent d5325b5 commit 8bc2dd7
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class ConfluentKafkaWithoutRegistryMessagingIT {

@Test
public void checkUserResourceByNormalUser() {
Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
app.given().get("/prices/poll")
.then()
.statusCode(HttpStatus.SC_OK);
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@
</configuration>
</execution>
</executions>
<configuration>
<trimStackTrace>false</trimStackTrace>
</configuration>
</plugin>
</plugins>
</build>
Expand Down Expand Up @@ -305,9 +308,6 @@
</configuration>
</execution>
</executions>
<configuration>
<trimStackTrace>false</trimStackTrace>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,10 @@ protected BaseKafkaContainerManagedResource(KafkaContainerManagedResourceBuilder
this.model = model;
}

protected abstract int getRegistryTargetPort();

protected abstract GenericContainer<?> initKafkaContainer();

protected abstract GenericContainer<?> initRegistryContainer(GenericContainer<?> kafka);

@Override
protected GenericContainer<?> initContainer() {
GenericContainer<?> kafkaContainer = initKafkaContainer();

if (model.isWithRegistry()) {
schemaRegistry = initRegistryContainer(kafkaContainer);
schemaRegistryLoggingHandler = new TestContainersLoggingHandler(model.getContext(), schemaRegistry);

// Setup common network for kafka and the registry
network = Network.newNetwork();
kafkaContainer.withNetwork(network);
schemaRegistry.withNetwork(network);
}

return kafkaContainer;
}

@Override
public void start() {
super.start();
Expand All @@ -67,8 +48,34 @@ protected String getKafkaVersion() {
return StringUtils.defaultIfBlank(model.getVersion(), model.getVendor().getDefaultVersion());
}

protected String getRegistryPath() {
return StringUtils.EMPTY;
protected String getKafkaRegistryImage() {
return model.getVendor().getRegistry().getImage() + ":" + model.getVendor().getRegistry().getDefaultVersion();
}

protected int getKafkaRegistryPort() {
return model.getVendor().getRegistry().getPort();
}

@Override
protected GenericContainer<?> initContainer() {
GenericContainer<?> kafkaContainer = initKafkaContainer();

if (model.isWithRegistry()) {
schemaRegistry = initRegistryContainer(kafkaContainer);
schemaRegistryLoggingHandler = new TestContainersLoggingHandler(model.getContext(), schemaRegistry);

// Setup common network for kafka and the registry
network = Network.newNetwork();
kafkaContainer.withNetwork(network);
schemaRegistry.withNetwork(network);
}

return kafkaContainer;
}

@Override
protected int getTargetPort() {
return model.getVendor().getPort();
}

private void startRegistryIfEnabled() {
Expand Down Expand Up @@ -99,8 +106,9 @@ private boolean isRegistryRunning() {
}

private String getSchemaRegistryUrl() {
return "http://" + schemaRegistry.getContainerIpAddress() + ":" + schemaRegistry.getMappedPort(getRegistryTargetPort())
+ getRegistryPath();
return "http://" + schemaRegistry.getContainerIpAddress()
+ ":" + schemaRegistry.getMappedPort(model.getVendor().getRegistry().getPort())
+ model.getVendor().getRegistry().getPath();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,21 @@

public class ConfluentKafkaContainerManagedResource extends BaseKafkaContainerManagedResource {

private static final String REGISTRY_IMAGE = "confluentinc/cp-schema-registry:";
private static final int REGISTRY_PORT = 8081;

protected ConfluentKafkaContainerManagedResource(KafkaContainerManagedResourceBuilder model) {
super(model);
}

@Override
protected int getTargetPort() {
return KafkaContainer.KAFKA_PORT;
}

@Override
protected int getRegistryTargetPort() {
return REGISTRY_PORT;
}

@Override
protected GenericContainer<?> initKafkaContainer() {
return new KafkaContainer(DockerImageName.parse(getKafkaImage() + ":" + getKafkaVersion()));
}

@Override
protected GenericContainer<?> initRegistryContainer(GenericContainer<?> kafka) {
GenericContainer<?> schemaRegistry = new GenericContainer<>(REGISTRY_IMAGE + getKafkaVersion());
schemaRegistry.withExposedPorts(REGISTRY_PORT);
GenericContainer<?> schemaRegistry = new GenericContainer<>(DockerImageName.parse(getKafkaRegistryImage()));
schemaRegistry.withExposedPorts(getKafkaRegistryPort());
schemaRegistry.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry");
schemaRegistry.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + REGISTRY_PORT);
schemaRegistry.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + getKafkaRegistryPort());
schemaRegistry.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
"PLAINTEXT://" + kafka.getNetworkAliases().get(0) + ":9092");
return schemaRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class OpenShiftStrimziKafkaContainerManagedResource implements ManagedRes
private static final String DEPLOYMENT_TEMPLATE_PROPERTY_DEFAULT = "/strimzi-deployment-template.yml";
private static final String DEPLOYMENT = "kafka.yml";

private static final String REGISTRY_DEPLOYMENT_TEMPLATE_PROPERTY_DEFAULT = "/apicurio-deployment-template.yml";
private static final String REGISTRY_DEPLOYMENT_TEMPLATE_PROPERTY_DEFAULT = "/registry-deployment-template.yml";
private static final String REGISTRY_DEPLOYMENT = "registry.yml";
private static final int REGISTRY_PORT = 8080;

Expand Down Expand Up @@ -116,7 +116,9 @@ private void applyDeployment() {

private void applyRegistryDeployment() {
client.applyServicePropertiesUsingTemplate(registry, REGISTRY_DEPLOYMENT_TEMPLATE_PROPERTY_DEFAULT,
content -> content.replaceAll(quote("${KAFKA_BOOTSTRAP_URL}"), getKafkaBootstrapUrl()),
content -> content.replaceAll(quote("${KAFKA_BOOTSTRAP_URL}"), getKafkaBootstrapUrl())
.replaceAll(quote("${KAFKA_REGISTRY_IMAGE}"), getKafkaRegistryImage())
.replaceAll(quote("${KAFKA_REGISTRY_PORT}"), "" + model.getVendor().getRegistry().getPort()),
model.getContext().getServiceFolder().resolve(REGISTRY_DEPLOYMENT));

client.expose(registry, REGISTRY_PORT);
Expand All @@ -142,6 +144,7 @@ private String replaceDeploymentContent(String content) {

return content.replaceAll(quote("${IMAGE}"), getKafkaImage())
.replaceAll(quote("${VERSION}"), getKafkaVersion())
.replaceAll(quote("${KAFKA_PORT}"), "" + model.getVendor().getPort())
.replaceAll(quote("${SERVICE_NAME}"), model.getContext().getName());
}

Expand All @@ -153,4 +156,8 @@ protected String getKafkaVersion() {
return StringUtils.defaultIfBlank(model.getVersion(), model.getVendor().getDefaultVersion());
}

protected String getKafkaRegistryImage() {
return model.getVendor().getRegistry().getImage() + ":" + model.getVendor().getRegistry().getDefaultVersion();
}

}
Original file line number Diff line number Diff line change
@@ -1,48 +1,27 @@
package io.quarkus.test.services.containers;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;

import io.strimzi.StrimziKafkaContainer;

public class StrimziKafkaContainerManagedResource extends BaseKafkaContainerManagedResource {

private static final String REGISTRY_IMAGE = "apicurio/apicurio-registry-mem:1.2.2.Final";
private static final int REGISTRY_PORT = 8080;
private static final String REGISTRY_PATH = "/api";
private static final int KAFKA_PORT = 9092;

protected StrimziKafkaContainerManagedResource(KafkaContainerManagedResourceBuilder model) {
super(model);
}

@Override
protected int getTargetPort() {
return KAFKA_PORT;
}

@Override
protected int getRegistryTargetPort() {
return REGISTRY_PORT;
}

@Override
protected GenericContainer<?> initKafkaContainer() {
return new StrimziKafkaContainer(getKafkaVersion());
}

@Override
protected GenericContainer<?> initRegistryContainer(GenericContainer<?> kafka) {
GenericContainer<?> schemaRegistry = new GenericContainer<>(REGISTRY_IMAGE);
schemaRegistry.withExposedPorts(REGISTRY_PORT);
GenericContainer<?> schemaRegistry = new GenericContainer<>(getKafkaRegistryImage());
schemaRegistry.withExposedPorts(getKafkaRegistryPort());
schemaRegistry.withEnv("APPLICATION_ID", "registry_id");
schemaRegistry.withEnv("APPLICATION_SERVER", "localhost:9000");
schemaRegistry.withEnv("KAFKA_BOOTSTRAP_SERVERS", "PLAINTEXT://localhost:" + KafkaContainer.KAFKA_PORT);
schemaRegistry.withEnv("KAFKA_BOOTSTRAP_SERVERS", "PLAINTEXT://localhost:" + getTargetPort());
return schemaRegistry;
}

@Override
protected String getRegistryPath() {
return REGISTRY_PATH;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.quarkus.test.services.containers.model;

public enum KafkaRegistry {
CONFLUENT("confluentinc/cp-schema-registry", "6.1.1", "/", 8081),
APICURIO("quay.io/apicurio/apicurio-registry-mem", "2.0.0.Final", "/api", 8080);

private final String image;
private final String defaultVersion;
private final String path;
private final int port;

KafkaRegistry(String image, String defaultVersion, String path, int port) {
this.image = image;
this.defaultVersion = defaultVersion;
this.path = path;
this.port = port;
}

public String getImage() {
return image;
}

public String getDefaultVersion() {
return defaultVersion;
}

public String getPath() {
return path;
}

public int getPort() {
return port;
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package io.quarkus.test.services.containers.model;

public enum KafkaVendor {
CONFLUENT("confluentinc/cp-kafka", "6.1.1"),
STRIMZI("quay.io/strimzi/kafka", "0.22.1-kafka-2.5.0");
CONFLUENT("confluentinc/cp-kafka", "6.1.1", 9093, KafkaRegistry.CONFLUENT),
STRIMZI("quay.io/strimzi/kafka", "0.22.1-kafka-2.7.0", 9092, KafkaRegistry.APICURIO);

private final String image;
private final String defaultVersion;
private final int port;
private final KafkaRegistry registry;

KafkaVendor(String image, String defaultVersion) {
KafkaVendor(String image, String defaultVersion, int port, KafkaRegistry registry) {
this.image = image;
this.defaultVersion = defaultVersion;
this.port = port;
this.registry = registry;
}

public String getImage() {
Expand All @@ -19,4 +23,12 @@ public String getImage() {
public String getDefaultVersion() {
return defaultVersion;
}

public int getPort() {
return port;
}

public KafkaRegistry getRegistry() {
return registry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ items:
app: registry
spec:
ports:
- port: 8080
targetPort: 8080
- port: ${KAFKA_REGISTRY_PORT}
targetPort: ${KAFKA_REGISTRY_PORT}
name: client
selector:
app: registry
Expand All @@ -28,7 +28,7 @@ items:
spec:
containers:
- name: kafka-registry
image: apicurio/apicurio-registry-mem:1.3.1.Final
image: ${KAFKA_REGISTRY_IMAGE}
env:
- name: QUARKUS_PROFILE
value: "prod"
Expand All @@ -39,7 +39,7 @@ items:
- name: APPLICATION_SERVER
value: "localhost:9000"
ports:
- containerPort: 8080
- containerPort: ${KAFKA_REGISTRY_PORT}
name: client
restartPolicy: Always
triggers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ items:
spec:
ports:
- name: "http"
port: 9092
targetPort: 9092
port: ${KAFKA_PORT}
targetPort: ${KAFKA_PORT}
selector:
app: ${SERVICE_NAME}
type: LoadBalancer
Expand Down Expand Up @@ -50,12 +50,12 @@ items:
image: ${IMAGE}:${VERSION}
imagePullPolicy: IfNotPresent
command: [ "/bin/sh" ]
args: [ "-c", "bin/kafka-server-start.sh config/server.properties --override listeners=PLAINTEXT://0.0.0.0:9092 --override advertised.listeners=PLAINTEXT://${SERVICE_NAME}:9092 --override zookeeper.connect=zookeeper-service:2181" ]
args: [ "-c", "bin/kafka-server-start.sh config/server.properties --override listeners=PLAINTEXT://0.0.0.0:${KAFKA_PORT} --override advertised.listeners=PLAINTEXT://${SERVICE_NAME}:${KAFKA_PORT} --override zookeeper.connect=zookeeper-service:2181" ]
env:
- name: "LOG_DIR"
value: "/tmp"
ports:
- containerPort: 9092
- containerPort: ${KAFKA_PORT}
resources: {}
triggers:
- type: "ConfigChange"
Expand Down

0 comments on commit 8bc2dd7

Please sign in to comment.