From 3c80cf1dee6eef6651b79369881d13f3571c11c3 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Thu, 17 Oct 2019 13:30:44 +0100 Subject: [PATCH] fix: be more lax on validating config (#3599) * fix: be more lax on validating config Fixes: #2279 KSQL should allow any property to be passed to producers and consumers as the can be used to initialize things such as interceptors. We can not know ahead of time what the properties a custom interceptor needs, hence we can't exclude any settings we see. --- .../ksql/config/KsqlConfigResolver.java | 18 ++-- .../ksql/config/KsqlConfigResolverTest.java | 86 ++++++++++++++++++- .../confluent/ksql/util/KsqlConfigTest.java | 18 ++++ 3 files changed, 109 insertions(+), 13 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java b/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java index e217586e75e4..8beb5b88dcad 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java +++ b/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java @@ -15,6 +15,9 @@ package io.confluent.ksql.config; +import static io.confluent.ksql.util.KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX; +import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; + import com.google.common.collect.ImmutableList; import io.confluent.ksql.util.KsqlConfig; import java.util.List; @@ -47,8 +50,8 @@ public class KsqlConfigResolver implements ConfigResolver { @Override public Optional resolve(final String propertyName, final boolean strict) { - if (propertyName.startsWith(KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX) - && !propertyName.startsWith(KsqlConfig.KSQL_STREAMS_PREFIX)) { + if (propertyName.startsWith(KSQL_CONFIG_PROPERTY_PREFIX) + && !propertyName.startsWith(KSQL_STREAMS_PREFIX)) { return resolveKsqlConfig(propertyName); } @@ -59,7 +62,7 @@ private static Optional resolveStreamsConfig( final String propertyName, final boolean strict) { - final String key = stripPrefix(propertyName, KsqlConfig.KSQL_STREAMS_PREFIX); + final String key = stripPrefix(propertyName, KSQL_STREAMS_PREFIX); final Optional resolved = STREAM_CONFIG_DEFS .stream() @@ -72,12 +75,9 @@ private static Optional resolveStreamsConfig( return resolved; } - if (key.startsWith(StreamsConfig.CONSUMER_PREFIX) - || key.startsWith(StreamsConfig.PRODUCER_PREFIX)) { - return Optional.empty(); // Unknown producer / consumer config - } - - if (propertyName.startsWith(KsqlConfig.KSQL_STREAMS_PREFIX)) { + if (propertyName.startsWith(KSQL_STREAMS_PREFIX) + && !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.PRODUCER_PREFIX) + && !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.CONSUMER_PREFIX)) { return Optional.empty(); // Unknown streams config } diff --git a/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java b/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java index b600cfc77ede..0b397a7f52dc 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java @@ -126,8 +126,47 @@ public void shouldResolveKsqlConsumerPrefixedConsumerConfig() { } @Test - public void shouldNotFindUnknownConsumerProperty() { - assertNotFound(StreamsConfig.CONSUMER_PREFIX + "you.won't.find.me...right"); + public void shouldNotFindUnknownConsumerPropertyIfStrict() { + // Given: + final String configName = StreamsConfig.CONSUMER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, true), is(Optional.empty())); + } + + @Test + public void shouldFindUnknownConsumerPropertyIfNotStrict() { + // Given: + final String configName = StreamsConfig.CONSUMER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, false), is(unresolvedItem(configName))); + } + + @Test + public void shouldNotFindUnknownStreamsPrefixedConsumerPropertyIfStrict() { + // Given: + final String configName = KsqlConfig.KSQL_STREAMS_PREFIX + + StreamsConfig.CONSUMER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, true), is(Optional.empty())); + } + + @Test + public void shouldFindUnknownStreamsPrefixedConsumerPropertyIfNotStrict() { + // Given: + final String configName = StreamsConfig.CONSUMER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat( + resolver.resolve(KsqlConfig.KSQL_STREAMS_PREFIX + configName, false), + is(unresolvedItem(configName)) + ); } @Test @@ -159,8 +198,47 @@ public void shouldResolveKsqlProducerPrefixedProducerConfig() { } @Test - public void shouldNotFindUnknownProducerProperty() { - assertNotFound(StreamsConfig.PRODUCER_PREFIX + "you.won't.find.me...right"); + public void shouldNotFindUnknownProducerPropertyIfStrict() { + // Given: + final String configName = StreamsConfig.PRODUCER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, true), is(Optional.empty())); + } + + @Test + public void shouldFindUnknownProducerPropertyIfNotStrict() { + // Given: + final String configName = StreamsConfig.PRODUCER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, false), is(unresolvedItem(configName))); + } + + @Test + public void shouldNotFindUnknownStreamsPrefixedProducerPropertyIfStrict() { + // Given: + final String configName = KsqlConfig.KSQL_STREAMS_PREFIX + + StreamsConfig.PRODUCER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat(resolver.resolve(configName, true), is(Optional.empty())); + } + + @Test + public void shouldFindUnknownStreamsPrefixedProducerPropertyIfNotStrict() { + // Given: + final String configName = StreamsConfig.PRODUCER_PREFIX + + "custom.interceptor.config"; + + // Then: + assertThat( + resolver.resolve(KsqlConfig.KSQL_STREAMS_PREFIX + configName, false), + is(unresolvedItem(configName)) + ); } @Test diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java index b2dc071824c1..40cbbc00b27b 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java @@ -232,6 +232,24 @@ public void shouldSetMonitoringInterceptorConfigProperties() { assertThat(result, equalTo("foo")); } + @Test + public void shouldSetMonitoringInterceptorConfigPropertiesByClientType() { + // Given: + final Map props = ImmutableMap.of( + "ksql.streams.consumer.confluent.monitoring.interceptor.topic", "foo", + "producer.confluent.monitoring.interceptor.topic", "bar" + ); + + final KsqlConfig ksqlConfig = new KsqlConfig(props); + + // When: + final Map result = ksqlConfig.getKsqlStreamConfigProps(); + + // Then: + assertThat(result.get("consumer.confluent.monitoring.interceptor.topic"), is("foo")); + assertThat(result.get("producer.confluent.monitoring.interceptor.topic"), is("bar")); + } + @Test public void shouldFilterPropertiesForWhichTypeUnknown() { final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap("you.shall.not.pass", "wizard"));