Skip to content

Commit

Permalink
feat: do not allow writing to the processing log topic
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Jan 17, 2020
1 parent bc969f3 commit 284ace5
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@

package io.confluent.ksql.util;

import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public final class ReservedInternalTopics {
private static final Logger LOG = LoggerFactory.getLogger(ReservedInternalTopics.class);

// These constant should not be part of KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG because they're
// not configurable.
public static final String KSQL_INTERNAL_TOPIC_PREFIX = "_confluent-ksql-";
Expand All @@ -47,14 +53,42 @@ public static String configsTopic(final KsqlConfig ksqlConfig) {
return toKsqlInternalTopic(ksqlConfig, KSQL_CONFIGS_TOPIC_SUFFIX);
}

/**
* Returns the KSQL processing log topic.
* <p/>
* This is not an internal topic in the sense that users are intentionally meant to read from
* this topic to identify deserialization and other processing errors, define a KSQL stream on
* it, and potentially issue queries to filter from it, etc. This is why it is not prefixed in
* the way KSQL internal topics are.
*
* @param config The Processing log config, which is used to extract the processing topic suffix
* @param ksqlConfig The KSQL config, which is used to extract the KSQL service id.
* @return The processing log topic name.
*/
public static String processingLogTopic(
final ProcessingLogConfig config,
final KsqlConfig ksqlConfig
) {
final String topicNameConfig = config.getString(ProcessingLogConfig.TOPIC_NAME);
if (topicNameConfig.equals(ProcessingLogConfig.TOPIC_NAME_NOT_SET)) {
return String.format(
"%s%s",
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
ProcessingLogConfig.TOPIC_NAME_DEFAULT_SUFFIX
);
} else {
return topicNameConfig;
}
}

/**
* Compute a name for a KSQL internal topic.
*
* @param ksqlConfig The KSQL config, which is used to extract the internal topic prefix.
* @param topicSuffix A suffix that is appended to the topic name.
* @return The computed topic name.
*/
public static String toKsqlInternalTopic(final KsqlConfig ksqlConfig, final String topicSuffix) {
private static String toKsqlInternalTopic(final KsqlConfig ksqlConfig, final String topicSuffix) {
return String.format(
"%s%s_%s",
KSQL_INTERNAL_TOPIC_PREFIX,
Expand All @@ -66,10 +100,18 @@ public static String toKsqlInternalTopic(final KsqlConfig ksqlConfig, final Stri
private final List<Pattern> systemInternalTopics;

public ReservedInternalTopics(final KsqlConfig ksqlConfig) {
this.systemInternalTopics = ksqlConfig.getList(KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG)
.stream()
.map(Pattern::compile)
.collect(Collectors.toList());
try {
this.systemInternalTopics = ksqlConfig.getList(KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG)
.stream()
.map(Pattern::compile)
.collect(Collectors.toList());
} catch (final Exception e) {
final String message = "Cannot get a list of system internal topics due to an invalid " +
"configuration in '" + KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG + "'";

LOG.error(message + ": " + e.getMessage());
throw new KsqlException(message, e);
}
}

public Set<String> filterInternalTopics(final Set<String> topicNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,31 @@

import java.util.List;
import java.util.Set;
import java.util.regex.PatternSyntaxException;

import com.google.common.collect.ImmutableSet;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

public class ReservedInternalTopicsTest {
@Rule
public final ExpectedException expectedException = ExpectedException.none();

private ReservedInternalTopics internalTopics;
private KsqlConfig ksqlConfig;

@Before
public void setUp() {
internalTopics = new ReservedInternalTopics(new KsqlConfig(ImmutableMap.of(
ksqlConfig = new KsqlConfig(ImmutableMap.of(
KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG, "prefix_.*,literal,.*_suffix"
)));
));

internalTopics = new ReservedInternalTopics(ksqlConfig);
}


Expand Down Expand Up @@ -100,4 +109,38 @@ public void shouldFilterAllInternalTopics() {
// Then
assertThat(filteredTopics, is(ImmutableSet.of("tt", "name1", "suffix")));
}

@Test
public void shouldThrowWhenInvalidSystemTopicsListIsUsed() {
// Given
final KsqlConfig givenConfig = new KsqlConfig(ImmutableMap.of(
KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG, "*_suffix"
));

// Then
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Cannot get a list of system internal topics due to" +
" an invalid configuration in '" + KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG + "'");

// When
new ReservedInternalTopics(givenConfig);
}

@Test
public void shouldReturnCommandTopic() {
// Given/When
final String commandTopic = ReservedInternalTopics.commandTopic(ksqlConfig);

// Then
assertThat("_confluent-ksql-default__command_topic", is(commandTopic));
}

@Test
public void shouldReturnConfigsTopic() {
// Given/When
final String commandTopic = ReservedInternalTopics.configsTopic(ksqlConfig);

// Then
assertThat("_confluent-ksql-default__configs", is(commandTopic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
Expand Down Expand Up @@ -194,6 +195,15 @@ private DataSource<?> getDataSource(
+ dataSource.getKafkaTopicName());
}

final ProcessingLogConfig processingLogConfig =
new ProcessingLogConfig(ksqlConfig.getAllConfigPropsWithSecretsObfuscated());
final String processingLogTopic =
ReservedInternalTopics.processingLogTopic(processingLogConfig, ksqlConfig);
if (dataSource.getKafkaTopicName().equals(processingLogTopic)) {
throw new KsqlException("Cannot insert into the processing log topic: "
+ dataSource.getKafkaTopicName());
}

return dataSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,36 @@ public void shouldThrowWhenInsertValuesOnReservedInternalTopic() {
executor.execute(statement, ImmutableMap.of(), engine, serviceContext);
}

@Test
public void shouldThrowWhenInsertValuesOnProcessingLogTopic() {
// Given
givenDataSourceWithSchema("default_ksql_processing_log", SCHEMA,
SerdeOption.none(), Optional.of(COL0), false);

final ConfiguredStatement<InsertValues> statement = ConfiguredStatement.of(
PreparedStatement.of(
"",
new InsertValues(SourceName.of("TOPIC"),
allFieldNames(SCHEMA),
ImmutableList.of(
new LongLiteral(1L),
new StringLiteral("str"),
new StringLiteral("str"),
new LongLiteral(2L)
))),
ImmutableMap.of(),
new KsqlConfig(ImmutableMap.of())
);

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"Cannot insert into the processing log topic: default_ksql_processing_log");

// When:
executor.execute(statement, ImmutableMap.of(), engine, serviceContext);
}

@Test
public void shouldThrowOnProducerSendError() throws ExecutionException, InterruptedException {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.confluent.ksql.rest.server.computation;

import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.InsertInto;
Expand Down Expand Up @@ -189,5 +190,14 @@ private void throwIfInsertOnInternalTopic(
throw new KsqlException("Cannot insert into the reserved internal topic: "
+ dataSource.getKafkaTopicName());
}

final ProcessingLogConfig processingLogConfig =
new ProcessingLogConfig(ksqlConfig.getAllConfigPropsWithSecretsObfuscated());
final String processingLogTopic =
ReservedInternalTopics.processingLogTopic(processingLogConfig, ksqlConfig);
if (dataSource.getKafkaTopicName().equals(processingLogTopic)) {
throw new KsqlException("Cannot insert into the processing log topic: "
+ dataSource.getKafkaTopicName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
Expand All @@ -50,16 +51,7 @@ static Schema getMessageSchema() {
public static String getTopicName(
final ProcessingLogConfig config,
final KsqlConfig ksqlConfig) {
final String topicNameConfig = config.getString(ProcessingLogConfig.TOPIC_NAME);
if (topicNameConfig.equals(ProcessingLogConfig.TOPIC_NAME_NOT_SET)) {
return String.format(
"%s%s",
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
ProcessingLogConfig.TOPIC_NAME_DEFAULT_SUFFIX
);
} else {
return topicNameConfig;
}
return ReservedInternalTopics.processingLogTopic(config, ksqlConfig);
}

public static Optional<String> maybeCreateProcessingLogTopic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,24 @@ public void shouldThrowExceptionWhenInsertIntoReservedInternalTopic() {
// When:
distributor.execute(configured, executionContext, mock(KsqlSecurityContext.class));
}

@Test
public void shouldThrowExceptionWhenInsertIntoProcessingLogTopic() {
// Given
final PreparedStatement<Statement> preparedStatement =
PreparedStatement.of("", new InsertInto(SourceName.of("s1"), mock(Query.class)));
final ConfiguredStatement<Statement> configured =
ConfiguredStatement.of(preparedStatement, ImmutableMap.of(), KSQL_CONFIG);
final DataSource<?> dataSource = mock(DataSource.class);
doReturn(dataSource).when(metaStore).getSource(SourceName.of("s1"));
when(dataSource.getKafkaTopicName()).thenReturn("default_ksql_processing_log");

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Cannot insert into the processing log topic: "
+ "default_ksql_processing_log");

// When:
distributor.execute(configured, executionContext, mock(KsqlSecurityContext.class));
}
}

0 comments on commit 284ace5

Please sign in to comment.