From a9639a2e0941b5f577cd39a3b5d0a4143ad09dee Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Mon, 14 Oct 2019 13:47:42 +0100 Subject: [PATCH] test: improve stability of tests on older branches (#3557) This commit brings together some recent tweaks done on master to improve the stability of integration tests. --- .../EmbeddedSingleNodeKafkaCluster.java | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/ksql-engine/src/test/java/io/confluent/ksql/testutils/EmbeddedSingleNodeKafkaCluster.java b/ksql-engine/src/test/java/io/confluent/ksql/testutils/EmbeddedSingleNodeKafkaCluster.java index 5ff2a200f1cc..344a547240db 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/testutils/EmbeddedSingleNodeKafkaCluster.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/testutils/EmbeddedSingleNodeKafkaCluster.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -45,7 +46,6 @@ import kafka.security.auth.ResourceType$; import kafka.security.auth.SimpleAclAuthorizer; import kafka.server.KafkaConfig; -import kafka.utils.ZKConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; @@ -69,6 +69,10 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource { private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class); + public static final Duration ZK_SESSION_TIMEOUT = Duration.ofSeconds(30); + // Jenkins builds can take ages to create the ZK log, so the initial connect can be slow, hence: + public static final Duration ZK_CONNECT_TIMEOUT = Duration.ofSeconds(60); + public static final Credentials VALID_USER1 = new Credentials("valid_user_1", "some-password"); public static final Credentials VALID_USER2 = @@ -120,7 +124,13 @@ public void start() throws Exception { brokerConfig.put("group.initial.rebalance.delay.ms", 100); broker = new KafkaEmbedded(effectiveBrokerConfigFrom()); clientConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); - authorizer.configure(ImmutableMap.of(ZKConfig.ZkConnectProp(), zookeeperConnect())); + + final ImmutableMap props = ImmutableMap.of( + KafkaConfig.ZkConnectProp(), zookeeperConnect(), + SimpleAclAuthorizer.ZkConnectionTimeOutProp(), (int) ZK_CONNECT_TIMEOUT.toMillis(), + SimpleAclAuthorizer.ZkSessionTimeOutProp(), (int) ZK_SESSION_TIMEOUT.toMillis() + ); + authorizer.configure(props); } @Override @@ -296,8 +306,21 @@ private Properties effectiveBrokerConfigFrom() { effectiveConfig.put(KafkaConfig.OffsetsTopicPartitionsProp(), "2"); // Shutdown quick: effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), false); + // Set ZK connect timeout high enough to give ZK time to build log file on build server: + effectiveConfig.put(KafkaConfig.ZkConnectionTimeoutMsProp(), (int) ZK_CONNECT_TIMEOUT.toMillis()); + // Set ZK session timeout high enough that slow build servers don't hit it: + effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), (int) ZK_SESSION_TIMEOUT.toMillis()); // Explicitly set to be less that the default 30 second timeout of KSQL functional tests effectiveConfig.put(KafkaConfig.ControllerSocketTimeoutMsProp(), 20_000); + // Streams runs multiple consumers, so let's give them all a chance to join. + // (Tests run quicker and with a more stable consumer group): + effectiveConfig.put(KafkaConfig.GroupInitialRebalanceDelayMsProp(), 100); + // Stop people writing silly data in tests: + effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 100_000); + // Stop logs being deleted due to retention limits: + effectiveConfig.put(KafkaConfig.LogRetentionTimeMillisProp(), -1); + // Stop logs marked for deletion from being deleted + effectiveConfig.put(KafkaConfig.LogDeleteDelayMsProp(), Long.MAX_VALUE); return effectiveConfig; }