Skip to content

Commit

Permalink
fix: be more lax on validating config (#3599)
Browse files Browse the repository at this point in the history
* fix: be more lax on validating config

Fixes: #2279

KSQL should allow any property to be passed to producers and consumers as the can be used to initialize things such as interceptors.
We can not know ahead of time what the properties a custom interceptor needs, hence we can't exclude any settings we see.
  • Loading branch information
big-andy-coates authored Oct 17, 2019
1 parent 6459fa1 commit 3c80cf1
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package io.confluent.ksql.config;

import static io.confluent.ksql.util.KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.util.KsqlConfig;
import java.util.List;
Expand Down Expand Up @@ -47,8 +50,8 @@ public class KsqlConfigResolver implements ConfigResolver {

@Override
public Optional<ConfigItem> resolve(final String propertyName, final boolean strict) {
if (propertyName.startsWith(KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX)
&& !propertyName.startsWith(KsqlConfig.KSQL_STREAMS_PREFIX)) {
if (propertyName.startsWith(KSQL_CONFIG_PROPERTY_PREFIX)
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX)) {
return resolveKsqlConfig(propertyName);
}

Expand All @@ -59,7 +62,7 @@ private static Optional<ConfigItem> resolveStreamsConfig(
final String propertyName,
final boolean strict) {

final String key = stripPrefix(propertyName, KsqlConfig.KSQL_STREAMS_PREFIX);
final String key = stripPrefix(propertyName, KSQL_STREAMS_PREFIX);

final Optional<ConfigItem> resolved = STREAM_CONFIG_DEFS
.stream()
Expand All @@ -72,12 +75,9 @@ private static Optional<ConfigItem> resolveStreamsConfig(
return resolved;
}

if (key.startsWith(StreamsConfig.CONSUMER_PREFIX)
|| key.startsWith(StreamsConfig.PRODUCER_PREFIX)) {
return Optional.empty(); // Unknown producer / consumer config
}

if (propertyName.startsWith(KsqlConfig.KSQL_STREAMS_PREFIX)) {
if (propertyName.startsWith(KSQL_STREAMS_PREFIX)
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.PRODUCER_PREFIX)
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.CONSUMER_PREFIX)) {
return Optional.empty(); // Unknown streams config
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,47 @@ public void shouldResolveKsqlConsumerPrefixedConsumerConfig() {
}

@Test
public void shouldNotFindUnknownConsumerProperty() {
assertNotFound(StreamsConfig.CONSUMER_PREFIX + "you.won't.find.me...right");
public void shouldNotFindUnknownConsumerPropertyIfStrict() {
// Given:
final String configName = StreamsConfig.CONSUMER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, true), is(Optional.empty()));
}

@Test
public void shouldFindUnknownConsumerPropertyIfNotStrict() {
// Given:
final String configName = StreamsConfig.CONSUMER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, false), is(unresolvedItem(configName)));
}

@Test
public void shouldNotFindUnknownStreamsPrefixedConsumerPropertyIfStrict() {
// Given:
final String configName = KsqlConfig.KSQL_STREAMS_PREFIX
+ StreamsConfig.CONSUMER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, true), is(Optional.empty()));
}

@Test
public void shouldFindUnknownStreamsPrefixedConsumerPropertyIfNotStrict() {
// Given:
final String configName = StreamsConfig.CONSUMER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(
resolver.resolve(KsqlConfig.KSQL_STREAMS_PREFIX + configName, false),
is(unresolvedItem(configName))
);
}

@Test
Expand Down Expand Up @@ -159,8 +198,47 @@ public void shouldResolveKsqlProducerPrefixedProducerConfig() {
}

@Test
public void shouldNotFindUnknownProducerProperty() {
assertNotFound(StreamsConfig.PRODUCER_PREFIX + "you.won't.find.me...right");
public void shouldNotFindUnknownProducerPropertyIfStrict() {
// Given:
final String configName = StreamsConfig.PRODUCER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, true), is(Optional.empty()));
}

@Test
public void shouldFindUnknownProducerPropertyIfNotStrict() {
// Given:
final String configName = StreamsConfig.PRODUCER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, false), is(unresolvedItem(configName)));
}

@Test
public void shouldNotFindUnknownStreamsPrefixedProducerPropertyIfStrict() {
// Given:
final String configName = KsqlConfig.KSQL_STREAMS_PREFIX
+ StreamsConfig.PRODUCER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, true), is(Optional.empty()));
}

@Test
public void shouldFindUnknownStreamsPrefixedProducerPropertyIfNotStrict() {
// Given:
final String configName = StreamsConfig.PRODUCER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(
resolver.resolve(KsqlConfig.KSQL_STREAMS_PREFIX + configName, false),
is(unresolvedItem(configName))
);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,24 @@ public void shouldSetMonitoringInterceptorConfigProperties() {
assertThat(result, equalTo("foo"));
}

@Test
public void shouldSetMonitoringInterceptorConfigPropertiesByClientType() {
// Given:
final Map<String, String> props = ImmutableMap.of(
"ksql.streams.consumer.confluent.monitoring.interceptor.topic", "foo",
"producer.confluent.monitoring.interceptor.topic", "bar"
);

final KsqlConfig ksqlConfig = new KsqlConfig(props);

// When:
final Map<String, Object> result = ksqlConfig.getKsqlStreamConfigProps();

// Then:
assertThat(result.get("consumer.confluent.monitoring.interceptor.topic"), is("foo"));
assertThat(result.get("producer.confluent.monitoring.interceptor.topic"), is("bar"));
}

@Test
public void shouldFilterPropertiesForWhichTypeUnknown() {
final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap("you.shall.not.pass", "wizard"));
Expand Down

0 comments on commit 3c80cf1

Please sign in to comment.