Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: be more lax on validating config #3599

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.config;

import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.util.KsqlConfig;
import java.util.List;
Expand Down Expand Up @@ -48,7 +50,7 @@ public class KsqlConfigResolver implements ConfigResolver {
@Override
public Optional<ConfigItem> resolve(final String propertyName, final boolean strict) {
if (propertyName.startsWith(KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX)
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
&& !propertyName.startsWith(KsqlConfig.KSQL_STREAMS_PREFIX)) {
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX)) {
return resolveKsqlConfig(propertyName);
}

Expand All @@ -59,7 +61,7 @@ private static Optional<ConfigItem> resolveStreamsConfig(
final String propertyName,
final boolean strict) {

final String key = stripPrefix(propertyName, KsqlConfig.KSQL_STREAMS_PREFIX);
final String key = stripPrefix(propertyName, KSQL_STREAMS_PREFIX);

final Optional<ConfigItem> resolved = STREAM_CONFIG_DEFS
.stream()
Expand All @@ -72,12 +74,9 @@ private static Optional<ConfigItem> resolveStreamsConfig(
return resolved;
}

if (key.startsWith(StreamsConfig.CONSUMER_PREFIX)
|| key.startsWith(StreamsConfig.PRODUCER_PREFIX)) {
return Optional.empty(); // Unknown producer / consumer config
}

if (propertyName.startsWith(KsqlConfig.KSQL_STREAMS_PREFIX)) {
if (propertyName.startsWith(KSQL_STREAMS_PREFIX)
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.PRODUCER_PREFIX)
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.CONSUMER_PREFIX)) {
return Optional.empty(); // Unknown streams config
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,47 @@ public void shouldResolveKsqlConsumerPrefixedConsumerConfig() {
}

@Test
public void shouldNotFindUnknownConsumerProperty() {
assertNotFound(StreamsConfig.CONSUMER_PREFIX + "you.won't.find.me...right");
public void shouldNotFindUnknownConsumerPropertyIfStrict() {
// Given:
final String configName = StreamsConfig.CONSUMER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, true), is(Optional.empty()));
}

@Test
public void shouldFindUnknownConsumerPropertyIfNotStrict() {
// Given:
final String configName = StreamsConfig.CONSUMER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, false), is(unresolvedItem(configName)));
}

@Test
public void shouldNotFindUnknownStreamsPrefixedConsumerPropertyIfStrict() {
// Given:
final String configName = KsqlConfig.KSQL_STREAMS_PREFIX
+ StreamsConfig.CONSUMER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, true), is(Optional.empty()));
}

@Test
public void shouldFindUnknownStreamsPrefixedConsumerPropertyIfNotStrict() {
// Given:
final String configName = StreamsConfig.CONSUMER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(
resolver.resolve(KsqlConfig.KSQL_STREAMS_PREFIX + configName, false),
is(unresolvedItem(configName))
);
}

@Test
Expand Down Expand Up @@ -159,8 +198,47 @@ public void shouldResolveKsqlProducerPrefixedProducerConfig() {
}

@Test
public void shouldNotFindUnknownProducerProperty() {
assertNotFound(StreamsConfig.PRODUCER_PREFIX + "you.won't.find.me...right");
public void shouldNotFindUnknownProducerPropertyIfStrict() {
// Given:
final String configName = StreamsConfig.PRODUCER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, true), is(Optional.empty()));
}

@Test
public void shouldFindUnknownProducerPropertyIfNotStrict() {
// Given:
final String configName = StreamsConfig.PRODUCER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, false), is(unresolvedItem(configName)));
}

@Test
public void shouldNotFindUnknownStreamsPrefixedProducerPropertyIfStrict() {
// Given:
final String configName = KsqlConfig.KSQL_STREAMS_PREFIX
+ StreamsConfig.PRODUCER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(resolver.resolve(configName, true), is(Optional.empty()));
}

@Test
public void shouldFindUnknownStreamsPrefixedProducerPropertyIfNotStrict() {
// Given:
final String configName = StreamsConfig.PRODUCER_PREFIX
+ "custom.interceptor.config";

// Then:
assertThat(
resolver.resolve(KsqlConfig.KSQL_STREAMS_PREFIX + configName, false),
is(unresolvedItem(configName))
);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,24 @@ public void shouldSetMonitoringInterceptorConfigProperties() {
assertThat(result, equalTo("foo"));
}

@Test
public void shouldSetMonitoringInterceptorConfigPropertiesByClientType() {
// Given:
final Map<String, String> props = ImmutableMap.of(
"ksql.streams.consumer.confluent.monitoring.interceptor.topic", "foo",
"producer.confluent.monitoring.interceptor.topic", "bar"
);

final KsqlConfig ksqlConfig = new KsqlConfig(props);

// When:
final Map<String, Object> result = ksqlConfig.getKsqlStreamConfigProps();

// Then:
assertThat(result.get("consumer.confluent.monitoring.interceptor.topic"), is("foo"));
assertThat(result.get("producer.confluent.monitoring.interceptor.topic"), is("bar"));
}

@Test
public void shouldFilterPropertiesForWhichTypeUnknown() {
final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap("you.shall.not.pass", "wizard"));
Expand Down