diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index 72be3d1fbdf5..e41e774ddb2c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -240,17 +240,17 @@ public boolean addTopicConfig(final String topicName, final Map overr @Override public TopicCleanupPolicy getTopicCleanupPolicy(final String topicName) { final String policy = getTopicConfig(topicName) - .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, ""); - - switch (policy) { - case "compact": - return TopicCleanupPolicy.COMPACT; - case "delete": - return TopicCleanupPolicy.DELETE; - case "compact+delete": - return TopicCleanupPolicy.COMPACT_DELETE; - default: - throw new KsqlException("Could not get the topic configs for : " + topicName); + .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "") + .toLowerCase(); + + if (policy.equals("compact")) { + return TopicCleanupPolicy.COMPACT; + } else if (policy.equals("delete")) { + return TopicCleanupPolicy.DELETE; + } else if (policy.contains("compact") && policy.contains("delete")) { + return TopicCleanupPolicy.COMPACT_DELETE; + } else { + throw new KsqlException("Could not get the topic configs for : " + topicName); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index b6f897be5575..40d98dd1f19b 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -15,7 +15,6 @@ package io.confluent.ksql.topic; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.SqlFormatter; @@ -23,8 +22,8 @@ import io.confluent.ksql.parser.properties.with.CreateSourceProperties; import io.confluent.ksql.parser.tree.CreateAsSelect; import io.confluent.ksql.parser.tree.CreateSource; +import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; -import io.confluent.ksql.parser.tree.CreateTableAsSelect; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.properties.with.CommonCreateConfigs; import io.confluent.ksql.services.KafkaTopicClient; @@ -35,6 +34,7 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -126,7 +126,10 @@ private ConfiguredStatement injectForCreateSource( properties.getPartitions(), properties.getReplicas()); - createTopic(topicPropertiesBuilder, createSource instanceof CreateTable); + final String topicCleanUpPolicy = createSource instanceof CreateTable + ? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE; + + createTopic(topicPropertiesBuilder, topicCleanUpPolicy); return statement; } @@ -157,10 +160,27 @@ private ConfiguredStatement injectForCreateAsSelec properties.getPartitions(), properties.getReplicas()); - final boolean shouldCompactTopic = createAsSelect instanceof CreateTableAsSelect - && !createAsSelect.getQuery().getWindow().isPresent(); + final String topicCleanUpPolicy; + final Map additionalTopicConfigs = new HashMap<>(); + if (createAsSelect instanceof CreateStreamAsSelect) { + topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_DELETE; + } else { + if (createAsSelect.getQuery().getWindow().isPresent()) { + topicCleanUpPolicy = + TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE; + + createAsSelect.getQuery().getWindow().get().getKsqlWindowExpression().getRetention() + .ifPresent(retention -> additionalTopicConfigs.put( + TopicConfig.RETENTION_MS_CONFIG, + retention.toDuration().toMillis() + )); + } else { + topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_COMPACT; + } + } - final TopicProperties info = createTopic(topicPropertiesBuilder, shouldCompactTopic); + final TopicProperties info + = createTopic(topicPropertiesBuilder, topicCleanUpPolicy, additionalTopicConfigs); final T withTopic = (T) createAsSelect.copyWith(properties.withTopic( info.getTopicName(), @@ -175,13 +195,21 @@ private ConfiguredStatement injectForCreateAsSelec private TopicProperties createTopic( final Builder topicPropertiesBuilder, - final boolean shouldCompactTopic + final String topicCleanUpPolicy + ) { + return createTopic(topicPropertiesBuilder, topicCleanUpPolicy, Collections.emptyMap()); + } + + private TopicProperties createTopic( + final Builder topicPropertiesBuilder, + final String topicCleanUpPolicy, + final Map additionalTopicConfigs ) { final TopicProperties info = topicPropertiesBuilder.build(); - final Map config = shouldCompactTopic - ? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) - : Collections.emptyMap(); + final Map config = new HashMap<>(); + config.put(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy); + config.putAll(additionalTopicConfigs); topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java index 4d7d92c7372d..c1bb3985c721 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java @@ -156,7 +156,7 @@ public void shouldAggregateTumblingWindow() { // Then: assertOutputOf(resultStream0, expected, is(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 3, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 3, resultStream0); } @Test @@ -179,7 +179,7 @@ public void shouldAggregateHoppingWindow() { // Then: assertOutputOf(resultStream0, expected, is(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 3, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 3, resultStream0); } @Test @@ -210,7 +210,7 @@ public void shouldAggregateSessionWindow() { // Then: assertOutputOf(resultStream0, expected, mapHasItems(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 2, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 2, resultStream0); } private void givenTable(final String sql) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index 69e6ea1026c1..a3162722a37a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -16,6 +16,8 @@ package io.confluent.ksql.services; import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; @@ -39,6 +41,7 @@ import io.confluent.ksql.exception.KafkaResponseGetFailedException; import io.confluent.ksql.exception.KafkaTopicExistsException; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; +import io.confluent.ksql.services.KafkaTopicClient.TopicCleanupPolicy; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -459,6 +462,46 @@ public void shouldGetTopicConfig() { assertThat(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), is("1")); } + @Test + public void shouldGetTopicCleanUpPolicyDelete() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.DELETE)); + } + + @Test + public void shouldGetTopicCleanUpPolicyCompact() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.COMPACT)); + } + + @Test + public void shouldGetTopicCleanUpPolicyCompactAndDelete() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, + CLEANUP_POLICY_COMPACT + "," + CLEANUP_POLICY_DELETE) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.COMPACT_DELETE)); + } + @Test public void shouldThrowOnNoneRetryableGetTopicConfigError() { // Given: @@ -517,7 +560,7 @@ public void shouldSetStringTopicConfig() { ); final Map configOverrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); // When: @@ -528,7 +571,7 @@ public void shouldSetStringTopicConfig() { verify(adminClient).incrementalAlterConfigs(ImmutableMap.of( topicResource("peter"), ImmutableSet.of( - setConfig(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) + setConfig(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) ) )); } @@ -570,7 +613,7 @@ public void shouldFallBackToAddTopicConfigForOlderBrokers() { ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); when(adminClient.incrementalAlterConfigs(any())) @@ -583,7 +626,7 @@ public void shouldFallBackToAddTopicConfigForOlderBrokers() { verify(adminClient).alterConfigs(ImmutableMap.of( topicResource("peter"), new Config(ImmutableSet.of( - new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), + new ConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1234") )) )); @@ -594,12 +637,12 @@ public void shouldNotAlterStringConfigIfMatchingConfigOverrideExists() { // Given: givenTopicConfigs( "peter", - overriddenConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), defaultConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy") ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); // When: @@ -641,7 +684,7 @@ public void shouldRetryAddingTopicConfig() { ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT ); when(adminClient.incrementalAlterConfigs(any())) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java index 3dc720b470eb..07f13d775574 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java @@ -54,6 +54,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -345,7 +346,7 @@ public void shouldCreateMissingTopic() { "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); } @Test @@ -362,7 +363,7 @@ public void shouldCreateMissingTopicForCreate() { "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); } @Test @@ -401,7 +402,7 @@ public void shouldCreateMissingTopicWithCompactCleanupPolicyForCreateTable() { } @Test - public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables() { + public void shouldCreateMissingTopicWithCompactAndDeleteCleanupPolicyForWindowedTables() { // Given: givenStatement("CREATE TABLE x WITH (kafka_topic='topic') " + "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS);"); @@ -415,7 +416,31 @@ public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables() "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, + TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)); + } + + @Test + public void shouldCreateMissingTopicWithSpecifiedRetentionForWindowedTables() { + // Given: + givenStatement("CREATE TABLE x WITH (kafka_topic='topic') " + + "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS, RETENTION 4 DAYS);"); + when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10)); + + // When: + injector.inject(statement, builder); + + // Then: + verify(topicClient).createTopic( + "expectedName", + 10, + (short) 10, + ImmutableMap.of( + TopicConfig.CLEANUP_POLICY_CONFIG, + TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, + TopicConfig.RETENTION_MS_CONFIG, + Duration.ofDays(4).toMillis() + )); } @Test