Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for KafkaNodePools #19

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/main/java/io/tealc/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Tealc authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.tealc;

/**
* Interface for keep global constants used across system tests.
*/
public interface Constants {

/**
* Basic paths to examples
*/
String PATH_TO_EXAMPLES = Utils.USER_PATH + "/src/main/resources";
String PATH_TO_PACKAGING_INSTALL_FILES = Utils.USER_PATH + "/../packaging/install";

/**
* File paths for metrics YAMLs
*/
String PATH_TO_KAFKA_METRICS_CONFIG = PATH_TO_EXAMPLES + "/kafka/kafka.yaml";
String PATH_TO_METRICS_CM = PATH_TO_EXAMPLES + "/kafka/metrics.yaml";

String KAFKA_METRICS_CONFIG_MAP_SUFFIX = "-kafka-metrics";

/**
* Strimzi domain used for the Strimzi labels
*/
String STRIMZI_DOMAIN = "strimzi.io/";

/**
* Kubernetes domain used for Kubernetes labels
*/
String KUBERNETES_DOMAIN = "app.kubernetes.io/";

/**
* The kind of a Kubernetes / OpenShift Resource. It contains the same value as the Kind of the corresponding
* Custom Resource. It should have on of the following values:
*
* <ul>
* <li>Kafka</li>
* <li>KafkaConnect</li>
* <li>KafkaMirrorMaker</li>
* <li>KafkaBridge</li>
* <li>KafkaUser</li>
* <li>KafkaTopic</li>
* </ul>
*/
String STRIMZI_KIND_LABEL = STRIMZI_DOMAIN + "kind";

/**
* The Strimzi cluster the resource is part of. This is typically the name of the custom resource.
*/
String STRIMZI_CLUSTER_LABEL = STRIMZI_DOMAIN + "cluster";
/**
* Annotation for enabling or disabling the Node Pools. This annotation is used on the Kafka CR
*/
String ANNO_STRIMZI_IO_NODE_POOLS = STRIMZI_DOMAIN + "node-pools";
}
11 changes: 10 additions & 1 deletion src/main/java/io/tealc/Environment.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;

Expand All @@ -28,6 +29,7 @@ public class Environment {
public static final String WORKER_03_USERNAME_ENV = "WORKER_03_USERNAME";
public static final String WORKER_03_PASSWORD_ENV = "WORKER_03_PASSWORD";
public static final String WORKER_03_URL_ENV = "WORKER_03_URL";
public static final String KAFKA_VERSION_ENV = "KAFKA_VERSION";

/**
* Set values
Expand All @@ -41,6 +43,7 @@ public class Environment {
public static final String WORKER_03_USERNAME = getOrDefault(WORKER_03_USERNAME_ENV, null);
public static final String WORKER_03_PASSWORD = getOrDefault(WORKER_03_PASSWORD_ENV, null);
public static final String WORKER_03_URL = getOrDefault(WORKER_03_URL_ENV, null);
public static final String KAFKA_VERSION = getOrDefault(KAFKA_VERSION_ENV, "3.5.1");

private Environment() { }

Expand All @@ -49,7 +52,13 @@ private Environment() { }
LOGGER.info("Used environment variables:");
VALUES.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> LOGGER.info(debugFormat, entry.getKey(), entry.getValue()));
.forEach(entry -> {
if (!entry.getKey().toLowerCase(Locale.ROOT).contains("pass")) {
LOGGER.info(debugFormat, entry.getKey(), entry.getValue());
} else {
LOGGER.info(debugFormat, entry.getKey(), "*****");
}
});
}

private static String getOrDefault(String varName, String defaultValue) {
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/io/tealc/KubeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
import io.strimzi.api.kafka.KafkaConnectorList;
import io.strimzi.api.kafka.KafkaList;
import io.strimzi.api.kafka.KafkaMirrorMaker2List;
import io.strimzi.api.kafka.KafkaNodePoolList;
import io.strimzi.api.kafka.KafkaRebalanceList;
import io.strimzi.api.kafka.KafkaTopicList;
import io.strimzi.api.kafka.KafkaUserList;
import io.strimzi.api.kafka.StrimziPodSetList;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaConnect;
import io.strimzi.api.kafka.model.KafkaConnector;
import io.strimzi.api.kafka.model.KafkaMirrorMaker2;
import io.strimzi.api.kafka.model.KafkaRebalance;
import io.strimzi.api.kafka.model.KafkaTopic;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.api.kafka.model.StrimziPodSet;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -309,4 +313,12 @@ public MixedOperation<KafkaMirrorMaker2, KafkaMirrorMaker2List, Resource<KafkaMi
public MixedOperation<KafkaRebalance, KafkaRebalanceList, Resource<KafkaRebalance>> kafkaRebalanceClient() {
return Crds.kafkaRebalanceOperation(client);
}

public MixedOperation<StrimziPodSet, StrimziPodSetList, Resource<StrimziPodSet>> strimziPodSetClient() {
return Crds.strimziPodSetOperation(client);
}

public MixedOperation<KafkaNodePool, KafkaNodePoolList, Resource<KafkaNodePool>> kafkaNodePoolClient() {
return Crds.kafkaNodePoolOperation(client);
}
}
136 changes: 136 additions & 0 deletions src/main/java/io/tealc/Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Tealc authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.tealc;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidFormatException;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLParser;
import io.fabric8.kubernetes.api.model.ConfigMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.function.BooleanSupplier;

public class Utils {
private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
public static final String USER_PATH = System.getProperty("user.dir");
public static <T> T configFromYaml(String yamlPath, Class<T> c) {
return configFromYaml(new File(yamlPath), c);
}

public static ConfigMap configMapFromYaml(String yamlPath, String name) {
try {
YAMLFactory yaml = new YAMLFactory();
ObjectMapper mapper = new ObjectMapper(yaml);
YAMLParser yamlParser = yaml.createParser(new File(yamlPath));
List<ConfigMap> list = mapper.readValues(yamlParser, new TypeReference<ConfigMap>() { }).readAll();
Optional<ConfigMap> cmOpt = list.stream().filter(cm -> "ConfigMap".equals(cm.getKind()) && name.equals(cm.getMetadata().getName())).findFirst();
if (cmOpt.isPresent()) {
return cmOpt.get();
} else {
LOGGER.warn("ConfigMap {} not found in file {}", name, yamlPath);
return null;
}
} catch (InvalidFormatException e) {
throw new IllegalArgumentException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}

}

public static <T> T configFromYaml(File yamlFile, Class<T> c) {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
try {
return mapper.readValue(yamlFile, c);
} catch (InvalidFormatException e) {
throw new IllegalArgumentException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Poll the given {@code ready} function every {@code pollIntervalMs} milliseconds until it returns true,
* or throw a WaitException if it doesn't return true within {@code timeoutMs} milliseconds.
* @return The remaining time left until timeout occurs
* (helpful if you have several calls which need to share a common timeout),
* */
public static long waitFor(String description, long pollIntervalMs, long timeoutMs, BooleanSupplier ready) {
return waitFor(description, pollIntervalMs, timeoutMs, ready, () -> { });
}

public static long waitFor(String description, long pollIntervalMs, long timeoutMs, BooleanSupplier ready, Runnable onTimeout) {
LOGGER.debug("Waiting for {}", description);
long deadline = System.currentTimeMillis() + timeoutMs;

String exceptionMessage = null;
String previousExceptionMessage = null;

// in case we are polling every 1s, we want to print exception after x tries, not on the first try
// for minutes poll interval will 2 be enough
int exceptionAppearanceCount = Duration.ofMillis(pollIntervalMs).toMinutes() > 0 ? 2 : Math.max((int) (timeoutMs / pollIntervalMs) / 4, 2);
int exceptionCount = 0;
int newExceptionAppearance = 0;

StringWriter stackTraceError = new StringWriter();

while (true) {
boolean result;
try {
result = ready.getAsBoolean();
} catch (Exception e) {
exceptionMessage = e.getMessage();

if (++exceptionCount == exceptionAppearanceCount && exceptionMessage != null && exceptionMessage.equals(previousExceptionMessage)) {
LOGGER.error("While waiting for {} exception occurred: {}", description, exceptionMessage);
// log the stacktrace
e.printStackTrace(new PrintWriter(stackTraceError));
} else if (exceptionMessage != null && !exceptionMessage.equals(previousExceptionMessage) && ++newExceptionAppearance == 2) {
previousExceptionMessage = exceptionMessage;
}

result = false;
}
long timeLeft = deadline - System.currentTimeMillis();
if (result) {
return timeLeft;
}
if (timeLeft <= 0) {
if (exceptionCount > 1) {
LOGGER.error("Exception waiting for {}, {}", description, exceptionMessage);

if (!stackTraceError.toString().isEmpty()) {
// printing handled stacktrace
LOGGER.error(stackTraceError.toString());
}
}
onTimeout.run();
WaitException waitException = new WaitException("Timeout after " + timeoutMs + " ms waiting for " + description);
waitException.printStackTrace();
throw waitException;
}
long sleepTime = Math.min(pollIntervalMs, timeLeft);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("{} not ready, will try again in {} ms ({}ms till timeout)", description, sleepTime, timeLeft);
}
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
return deadline - System.currentTimeMillis();
}
}
}

}
15 changes: 15 additions & 0 deletions src/main/java/io/tealc/WaitException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Tealc authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.tealc;

public class WaitException extends RuntimeException {
public WaitException(String message) {
super(message);
}

public WaitException(Throwable cause) {
super(cause);
}
}
27 changes: 27 additions & 0 deletions src/main/java/io/tealc/templates/ConfigMapTemplates.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Tealc authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.tealc.templates;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.tealc.Constants;
import io.tealc.Utils;

public class ConfigMapTemplates {
private ConfigMapTemplates() {}

public static ConfigMapBuilder kafkaConfigMap(String namespace, String clusterName) {
ConfigMap configMap = getConfigMapFromYaml(Constants.PATH_TO_METRICS_CM);
return new ConfigMapBuilder(configMap)
.editMetadata()
.withName(clusterName + Constants.KAFKA_METRICS_CONFIG_MAP_SUFFIX)
.withNamespace(namespace)
.endMetadata();
}

private static ConfigMap getConfigMapFromYaml(String yamlPath) {
return Utils.configFromYaml(yamlPath, ConfigMap.class);
}
}
35 changes: 35 additions & 0 deletions src/main/java/io/tealc/templates/KafkaNodePoolTemplates.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Tealc authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.tealc.templates;

import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.tealc.Constants;

import java.util.Map;

public class KafkaNodePoolTemplates {

private KafkaNodePoolTemplates() {}

public static KafkaNodePoolBuilder defaultKafkaNodePool(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
return new KafkaNodePoolBuilder()
.withNewMetadata()
.withNamespace(namespaceName)
.withName(nodePoolName)
.withLabels(Map.of(Constants.STRIMZI_CLUSTER_LABEL, kafkaClusterName))
.endMetadata()
.withNewSpec()
.withReplicas(kafkaReplicas)
.endSpec();
}

public static KafkaNodePoolBuilder kafkaNodePoolWithBrokerRole(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
return defaultKafkaNodePool(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas)
.editOrNewSpec()
.addToRoles(ProcessRoles.BROKER)
.endSpec();
}
}
Loading