diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java b/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
index 59c19da5a8bf..8c754b536ffb 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
@@ -15,12 +15,18 @@
package io.confluent.ksql.util;
+import io.confluent.ksql.logging.processing.ProcessingLogConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public final class ReservedInternalTopics {
+ private static final Logger LOG = LoggerFactory.getLogger(ReservedInternalTopics.class);
+
// These constant should not be part of KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG because they're
// not configurable.
public static final String KSQL_INTERNAL_TOPIC_PREFIX = "_confluent-ksql-";
@@ -47,6 +53,34 @@ public static String configsTopic(final KsqlConfig ksqlConfig) {
return toKsqlInternalTopic(ksqlConfig, KSQL_CONFIGS_TOPIC_SUFFIX);
}
+ /**
+ * Returns the KSQL processing log topic.
+ *
+ * This is not an internal topic in the sense that users are intentionally meant to read from
+ * this topic to identify deserialization and other processing errors, define a KSQL stream on
+ * it, and potentially issue queries to filter from it, etc. This is why it is not prefixed in
+ * the way KSQL internal topics are.
+ *
+ * @param config The Processing log config, which is used to extract the processing topic suffix
+ * @param ksqlConfig The KSQL config, which is used to extract the KSQL service id.
+ * @return The processing log topic name.
+ */
+ public static String processingLogTopic(
+ final ProcessingLogConfig config,
+ final KsqlConfig ksqlConfig
+ ) {
+ final String topicNameConfig = config.getString(ProcessingLogConfig.TOPIC_NAME);
+ if (topicNameConfig.equals(ProcessingLogConfig.TOPIC_NAME_NOT_SET)) {
+ return String.format(
+ "%s%s",
+ ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
+ ProcessingLogConfig.TOPIC_NAME_DEFAULT_SUFFIX
+ );
+ } else {
+ return topicNameConfig;
+ }
+ }
+
/**
* Compute a name for a KSQL internal topic.
*
@@ -54,7 +88,7 @@ public static String configsTopic(final KsqlConfig ksqlConfig) {
* @param topicSuffix A suffix that is appended to the topic name.
* @return The computed topic name.
*/
- public static String toKsqlInternalTopic(final KsqlConfig ksqlConfig, final String topicSuffix) {
+ private static String toKsqlInternalTopic(final KsqlConfig ksqlConfig, final String topicSuffix) {
return String.format(
"%s%s_%s",
KSQL_INTERNAL_TOPIC_PREFIX,
@@ -66,10 +100,18 @@ public static String toKsqlInternalTopic(final KsqlConfig ksqlConfig, final Stri
private final List systemInternalTopics;
public ReservedInternalTopics(final KsqlConfig ksqlConfig) {
- this.systemInternalTopics = ksqlConfig.getList(KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG)
- .stream()
- .map(Pattern::compile)
- .collect(Collectors.toList());
+ try {
+ this.systemInternalTopics = ksqlConfig.getList(KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG)
+ .stream()
+ .map(Pattern::compile)
+ .collect(Collectors.toList());
+ } catch (final Exception e) {
+ final String message = "Cannot get a list of system internal topics due to an invalid " +
+ "configuration in '" + KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG + "'";
+
+ LOG.error(message + ": " + e.getMessage());
+ throw new KsqlException(message, e);
+ }
}
public Set filterInternalTopics(final Set topicNames) {
diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java
index 44da540e2b0b..0046a2550bf4 100644
--- a/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java
+++ b/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java
@@ -20,22 +20,31 @@
import java.util.List;
import java.util.Set;
+import java.util.regex.PatternSyntaxException;
import com.google.common.collect.ImmutableSet;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class ReservedInternalTopicsTest {
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
private ReservedInternalTopics internalTopics;
+ private KsqlConfig ksqlConfig;
@Before
public void setUp() {
- internalTopics = new ReservedInternalTopics(new KsqlConfig(ImmutableMap.of(
+ ksqlConfig = new KsqlConfig(ImmutableMap.of(
KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG, "prefix_.*,literal,.*_suffix"
- )));
+ ));
+
+ internalTopics = new ReservedInternalTopics(ksqlConfig);
}
@@ -100,4 +109,38 @@ public void shouldFilterAllInternalTopics() {
// Then
assertThat(filteredTopics, is(ImmutableSet.of("tt", "name1", "suffix")));
}
+
+ @Test
+ public void shouldThrowWhenInvalidSystemTopicsListIsUsed() {
+ // Given
+ final KsqlConfig givenConfig = new KsqlConfig(ImmutableMap.of(
+ KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG, "*_suffix"
+ ));
+
+ // Then
+ expectedException.expect(KsqlException.class);
+ expectedException.expectMessage("Cannot get a list of system internal topics due to" +
+ " an invalid configuration in '" + KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG + "'");
+
+ // When
+ new ReservedInternalTopics(givenConfig);
+ }
+
+ @Test
+ public void shouldReturnCommandTopic() {
+ // Given/When
+ final String commandTopic = ReservedInternalTopics.commandTopic(ksqlConfig);
+
+ // Then
+ assertThat("_confluent-ksql-default__command_topic", is(commandTopic));
+ }
+
+ @Test
+ public void shouldReturnConfigsTopic() {
+ // Given/When
+ final String commandTopic = ReservedInternalTopics.configsTopic(ksqlConfig);
+
+ // Then
+ assertThat("_confluent-ksql-default__configs", is(commandTopic));
+ }
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java
index fd0fd98d3b4a..63dbc30dd233 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java
@@ -28,6 +28,7 @@
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
+import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
@@ -194,6 +195,15 @@ private DataSource> getDataSource(
+ dataSource.getKafkaTopicName());
}
+ final ProcessingLogConfig processingLogConfig =
+ new ProcessingLogConfig(ksqlConfig.getAllConfigPropsWithSecretsObfuscated());
+ final String processingLogTopic =
+ ReservedInternalTopics.processingLogTopic(processingLogConfig, ksqlConfig);
+ if (dataSource.getKafkaTopicName().equals(processingLogTopic)) {
+ throw new KsqlException("Cannot insert into the processing log topic: "
+ + dataSource.getKafkaTopicName());
+ }
+
return dataSource;
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java
index ac11c9ec898a..3788b01c38f9 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java
@@ -565,6 +565,36 @@ public void shouldThrowWhenInsertValuesOnReservedInternalTopic() {
executor.execute(statement, ImmutableMap.of(), engine, serviceContext);
}
+ @Test
+ public void shouldThrowWhenInsertValuesOnProcessingLogTopic() {
+ // Given
+ givenDataSourceWithSchema("default_ksql_processing_log", SCHEMA,
+ SerdeOption.none(), Optional.of(COL0), false);
+
+ final ConfiguredStatement statement = ConfiguredStatement.of(
+ PreparedStatement.of(
+ "",
+ new InsertValues(SourceName.of("TOPIC"),
+ allFieldNames(SCHEMA),
+ ImmutableList.of(
+ new LongLiteral(1L),
+ new StringLiteral("str"),
+ new StringLiteral("str"),
+ new LongLiteral(2L)
+ ))),
+ ImmutableMap.of(),
+ new KsqlConfig(ImmutableMap.of())
+ );
+
+ // Expect:
+ expectedException.expect(KsqlException.class);
+ expectedException.expectMessage(
+ "Cannot insert into the processing log topic: default_ksql_processing_log");
+
+ // When:
+ executor.execute(statement, ImmutableMap.of(), engine, serviceContext);
+ }
+
@Test
public void shouldThrowOnProducerSendError() throws ExecutionException, InterruptedException {
// Given:
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java
index 217570d6e873..06022043f8b1 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java
@@ -15,6 +15,7 @@
package io.confluent.ksql.rest.server.computation;
import io.confluent.ksql.KsqlExecutionContext;
+import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.InsertInto;
@@ -189,5 +190,14 @@ private void throwIfInsertOnInternalTopic(
throw new KsqlException("Cannot insert into the reserved internal topic: "
+ dataSource.getKafkaTopicName());
}
+
+ final ProcessingLogConfig processingLogConfig =
+ new ProcessingLogConfig(ksqlConfig.getAllConfigPropsWithSecretsObfuscated());
+ final String processingLogTopic =
+ ReservedInternalTopics.processingLogTopic(processingLogConfig, ksqlConfig);
+ if (dataSource.getKafkaTopicName().equals(processingLogTopic)) {
+ throw new KsqlException("Cannot insert into the processing log topic: "
+ + dataSource.getKafkaTopicName());
+ }
}
}
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java
index 44d4d541091f..c7cb98b73594 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java
@@ -24,6 +24,7 @@
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlConfig;
+import io.confluent.ksql.util.ReservedInternalTopics;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -50,16 +51,7 @@ static Schema getMessageSchema() {
public static String getTopicName(
final ProcessingLogConfig config,
final KsqlConfig ksqlConfig) {
- final String topicNameConfig = config.getString(ProcessingLogConfig.TOPIC_NAME);
- if (topicNameConfig.equals(ProcessingLogConfig.TOPIC_NAME_NOT_SET)) {
- return String.format(
- "%s%s",
- ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
- ProcessingLogConfig.TOPIC_NAME_DEFAULT_SUFFIX
- );
- } else {
- return topicNameConfig;
- }
+ return ReservedInternalTopics.processingLogTopic(config, ksqlConfig);
}
public static Optional maybeCreateProcessingLogTopic(
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java
index 5bc57b9dbad5..206e548f9f26 100644
--- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java
@@ -325,4 +325,24 @@ public void shouldThrowExceptionWhenInsertIntoReservedInternalTopic() {
// When:
distributor.execute(configured, executionContext, mock(KsqlSecurityContext.class));
}
+
+ @Test
+ public void shouldThrowExceptionWhenInsertIntoProcessingLogTopic() {
+ // Given
+ final PreparedStatement preparedStatement =
+ PreparedStatement.of("", new InsertInto(SourceName.of("s1"), mock(Query.class)));
+ final ConfiguredStatement configured =
+ ConfiguredStatement.of(preparedStatement, ImmutableMap.of(), KSQL_CONFIG);
+ final DataSource> dataSource = mock(DataSource.class);
+ doReturn(dataSource).when(metaStore).getSource(SourceName.of("s1"));
+ when(dataSource.getKafkaTopicName()).thenReturn("default_ksql_processing_log");
+
+ // Expect:
+ expectedException.expect(KsqlException.class);
+ expectedException.expectMessage("Cannot insert into the processing log topic: "
+ + "default_ksql_processing_log");
+
+ // When:
+ distributor.execute(configured, executionContext, mock(KsqlSecurityContext.class));
+ }
}