diff --git a/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java b/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java index e217586e75e4..8beb5b88dcad 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java +++ b/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java @@ -15,6 +15,9 @@ package io.confluent.ksql.config; +import static io.confluent.ksql.util.KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX; +import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; + import com.google.common.collect.ImmutableList; import io.confluent.ksql.util.KsqlConfig; import java.util.List; @@ -47,8 +50,8 @@ public class KsqlConfigResolver implements ConfigResolver { @Override public Optional resolve(final String propertyName, final boolean strict) { - if (propertyName.startsWith(KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX) - && !propertyName.startsWith(KsqlConfig.KSQL_STREAMS_PREFIX)) { + if (propertyName.startsWith(KSQL_CONFIG_PROPERTY_PREFIX) + && !propertyName.startsWith(KSQL_STREAMS_PREFIX)) { return resolveKsqlConfig(propertyName); } @@ -59,7 +62,7 @@ private static Optional resolveStreamsConfig( final String propertyName, final boolean strict) { - final String key = stripPrefix(propertyName, KsqlConfig.KSQL_STREAMS_PREFIX); + final String key = stripPrefix(propertyName, KSQL_STREAMS_PREFIX); final Optional resolved = STREAM_CONFIG_DEFS .stream() @@ -72,12 +75,9 @@ private static Optional resolveStreamsConfig( return resolved; } - if (key.startsWith(StreamsConfig.CONSUMER_PREFIX) - || key.startsWith(StreamsConfig.PRODUCER_PREFIX)) { - return Optional.empty(); // Unknown producer / consumer config - } - - if (propertyName.startsWith(KsqlConfig.KSQL_STREAMS_PREFIX)) { + if (propertyName.startsWith(KSQL_STREAMS_PREFIX) + && !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.PRODUCER_PREFIX) + && !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.CONSUMER_PREFIX)) { return Optional.empty(); // Unknown streams config } diff --git a/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java b/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java index b600cfc77ede..0b397a7f52dc 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java @@ -126,8 +126,47 @@ public void shouldResolveKsqlConsumerPrefixedConsumerConfig() { } @Test - public void shouldNotFindUnknownConsumerProperty() { - assertNotFound(StreamsConfig.CONSUMER_PREFIX + "you.won't.find.me...right"); + public void shouldNotFindUnknownConsumerPropertyIfStrict() { + // Given: + final String configName = StreamsConfig.CONSUMER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, true), is(Optional.empty())); + } + + @Test + public void shouldFindUnknownConsumerPropertyIfNotStrict() { + // Given: + final String configName = StreamsConfig.CONSUMER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, false), is(unresolvedItem(configName))); + } + + @Test + public void shouldNotFindUnknownStreamsPrefixedConsumerPropertyIfStrict() { + // Given: + final String configName = KsqlConfig.KSQL_STREAMS_PREFIX + + StreamsConfig.CONSUMER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, true), is(Optional.empty())); + } + + @Test + public void shouldFindUnknownStreamsPrefixedConsumerPropertyIfNotStrict() { + // Given: + final String configName = StreamsConfig.CONSUMER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat( + resolver.resolve(KsqlConfig.KSQL_STREAMS_PREFIX + configName, false), + is(unresolvedItem(configName)) + ); } @Test @@ -159,8 +198,47 @@ public void shouldResolveKsqlProducerPrefixedProducerConfig() { } @Test - public void shouldNotFindUnknownProducerProperty() { - assertNotFound(StreamsConfig.PRODUCER_PREFIX + "you.won't.find.me...right"); + public void shouldNotFindUnknownProducerPropertyIfStrict() { + // Given: + final String configName = StreamsConfig.PRODUCER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, true), is(Optional.empty())); + } + + @Test + public void shouldFindUnknownProducerPropertyIfNotStrict() { + // Given: + final String configName = StreamsConfig.PRODUCER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, false), is(unresolvedItem(configName))); + } + + @Test + public void shouldNotFindUnknownStreamsPrefixedProducerPropertyIfStrict() { + // Given: + final String configName = KsqlConfig.KSQL_STREAMS_PREFIX + + StreamsConfig.PRODUCER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, true), is(Optional.empty())); + } + + @Test + public void shouldFindUnknownStreamsPrefixedProducerPropertyIfNotStrict() { + // Given: + final String configName = StreamsConfig.PRODUCER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat( + resolver.resolve(KsqlConfig.KSQL_STREAMS_PREFIX + configName, false), + is(unresolvedItem(configName)) + ); } @Test diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java index b2dc071824c1..40cbbc00b27b 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java @@ -232,6 +232,24 @@ public void shouldSetMonitoringInterceptorConfigProperties() { assertThat(result, equalTo("foo")); } + @Test + public void shouldSetMonitoringInterceptorConfigPropertiesByClientType() { + // Given: + final Map props = ImmutableMap.of( + "ksql.streams.consumer.confluent.monitoring.interceptor.topic", "foo", + "producer.confluent.monitoring.interceptor.topic", "bar" + ); + + final KsqlConfig ksqlConfig = new KsqlConfig(props); + + // When: + final Map result = ksqlConfig.getKsqlStreamConfigProps(); + + // Then: + assertThat(result.get("consumer.confluent.monitoring.interceptor.topic"), is("foo")); + assertThat(result.get("producer.confluent.monitoring.interceptor.topic"), is("bar")); + } + @Test public void shouldFilterPropertiesForWhichTypeUnknown() { final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap("you.shall.not.pass", "wizard"));