Skip to content

Commit

Permalink
refactor: use the query source format as default for sink topics
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentJenkins committed Aug 24, 2021
1 parent a947053 commit 8630967
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static Optional<KsqlAuthorizationValidator> create(
);

return accessValidator.map(v ->
new KsqlAuthorizationValidatorImpl(ksqlConfig, cacheIfEnabled(ksqlConfig, v)));
new KsqlAuthorizationValidatorImpl(cacheIfEnabled(ksqlConfig, v)));
}

private static Optional<KsqlAccessValidator> getAccessValidator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.topic.SourceTopicsExtractor;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
Expand All @@ -49,26 +48,9 @@
*/
public class KsqlAuthorizationValidatorImpl implements KsqlAuthorizationValidator {
private final KsqlAccessValidator accessValidator;
private final String defaultTopicKeyFormat;
private final String defaultTopicValueFormat;

public KsqlAuthorizationValidatorImpl(final KsqlConfig ksqlConfig,
final KsqlAccessValidator accessValidator) {
public KsqlAuthorizationValidatorImpl(final KsqlAccessValidator accessValidator) {
this.accessValidator = accessValidator;

this.defaultTopicKeyFormat =
ksqlConfig.getString(KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG);
if (defaultTopicKeyFormat == null) {
throw new KsqlException("Missing default topic key format. Please provide one via the '"
+ KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG + "' config.");
}

this.defaultTopicValueFormat =
ksqlConfig.getString(KsqlConfig.KSQL_DEFAULT_VALUE_FORMAT_CONFIG);
if (defaultTopicValueFormat == null) {
throw new KsqlException("Missing default topic value format. Please provide one via the '"
+ KsqlConfig.KSQL_DEFAULT_VALUE_FORMAT_CONFIG + "' config.");
}
}

KsqlAccessValidator getAccessValidator() {
Expand Down Expand Up @@ -196,27 +178,33 @@ private KsqlTopic getCreateAsSelectSinkTopic(
+ createAsSelect.getName());
}
} else {
final Format keyFormat = FormatFactory.fromName(
properties.getKeyFormat().orElse(defaultTopicKeyFormat));
final Format valueFormat = FormatFactory.fromName(
properties.getValueFormat().orElse(defaultTopicValueFormat));

sinkTopicName = properties.getKafkaTopic().get();

sinkKeyFormat = KeyFormat.of(
FormatInfo.of(keyFormat.name()),
keyFormat.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)
// If no format is specified for the sink topic, then use the format from the primary
// source topic.
final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore);
extractor.process(createAsSelect.getQuery(), null);
final KsqlTopic primaryKsqlTopic = extractor.getPrimarySourceTopic();

final Optional<Format> keyFormat =
properties.getKeyFormat().map(formatName -> FormatFactory.fromName(formatName));
final Optional<Format> valueFormat =
properties.getValueFormat().map(formatName -> FormatFactory.fromName(formatName));

sinkKeyFormat = keyFormat.map(format -> KeyFormat.of(
FormatInfo.of(format.name()),
format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)
? SerdeFeatures.of(SerdeFeature.SCHEMA_INFERENCE)
: SerdeFeatures.of(),
Optional.empty()
);
)).orElse(primaryKsqlTopic.getKeyFormat());

sinkValueFormat = ValueFormat.of(
FormatInfo.of(valueFormat.name()),
valueFormat.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)
sinkValueFormat = valueFormat.map(format -> ValueFormat.of(
FormatInfo.of(format.name()),
format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)
? SerdeFeatures.of(SerdeFeature.SCHEMA_INFERENCE)
: SerdeFeatures.of()
);
)).orElse(primaryKsqlTopic.getValueFormat());
}

return new KsqlTopic(sinkTopicName, sinkKeyFormat, sinkValueFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ public void setUp() {
when(serviceContext.getAdminClient()).thenReturn(adminClient);
when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_ACCESS_VALIDATOR))
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_AUTO);
when(ksqlConfig.getString(KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG))
.thenReturn("KAFKA");
when(ksqlConfig.getString(KsqlConfig.KSQL_DEFAULT_VALUE_FORMAT_CONFIG))
.thenReturn("JSON");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.doThrow;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.exception.KsqlSchemaAuthorizationException;
Expand All @@ -45,7 +44,6 @@
import java.util.Collections;
import java.util.Optional;

import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.common.acl.AclOperation;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -97,15 +95,10 @@ public class KsqlAuthorizationValidatorImplTest {

@Before
public void setUp() {
final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of(
KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG, "KAFKA",
KsqlConfig.KSQL_DEFAULT_VALUE_FORMAT_CONFIG, "AVRO"
));

metaStore = new MetaStoreImpl(new InternalFunctionRegistry());
ksqlEngine = KsqlEngineTestUtil.createKsqlEngine(serviceContext, metaStore);

authorizationValidator = new KsqlAuthorizationValidatorImpl(ksqlConfig, accessValidator);
authorizationValidator = new KsqlAuthorizationValidatorImpl(accessValidator);
securityContext = new KsqlSecurityContext(Optional.empty(), serviceContext);

givenStreamWithTopic(KAFKA_STREAM_TOPIC, KAFKA_KSQL_TOPIC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ public static class MutualAuth {
.withProperty(KsqlRestConfig.KSQL_INTERNAL_SSL_CLIENT_AUTHENTICATION_CONFIG,
KsqlRestConfig.SSL_CLIENT_AUTHENTICATION_REQUIRED)
.withProperty(KsqlRestConfig.KSQL_SSL_KEYSTORE_ALIAS_INTERNAL_CONFIG, "node-1.example.com")
.withProperty(KsqlConfig.KSQL_DEFAULT_VALUE_FORMAT_CONFIG, "JSON")
.withProperties(COMMON_CONFIG)
.withProperties(JASS_AUTH_CONFIG)
.withProperties(internalKeyStoreProps(true))
Expand All @@ -186,7 +185,6 @@ public static class MutualAuth {
.withProperty(KsqlRestConfig.KSQL_INTERNAL_SSL_CLIENT_AUTHENTICATION_CONFIG,
KsqlRestConfig.SSL_CLIENT_AUTHENTICATION_REQUIRED)
.withProperty(KsqlRestConfig.KSQL_SSL_KEYSTORE_ALIAS_INTERNAL_CONFIG, "node-2.example.com")
.withProperty(KsqlConfig.KSQL_DEFAULT_VALUE_FORMAT_CONFIG, "JSON")
.withProperties(COMMON_CONFIG)
.withProperties(JASS_AUTH_CONFIG)
.withProperties(internalKeyStoreProps(false))
Expand Down Expand Up @@ -261,7 +259,6 @@ public static class HttpsNoMutualAuth {
.withProperty(KsqlRestConfig.KSQL_INTERNAL_SSL_CLIENT_AUTHENTICATION_CONFIG,
KsqlRestConfig.SSL_CLIENT_AUTHENTICATION_NONE)
.withProperty(KsqlRestConfig.KSQL_SSL_KEYSTORE_ALIAS_INTERNAL_CONFIG, "node-1.example.com")
.withProperty(KsqlConfig.KSQL_DEFAULT_VALUE_FORMAT_CONFIG, "JSON")
.withProperties(COMMON_CONFIG)
.withProperties(internalKeyStoreProps(true))
.build();
Expand All @@ -276,7 +273,6 @@ public static class HttpsNoMutualAuth {
.withProperty(KsqlRestConfig.KSQL_INTERNAL_SSL_CLIENT_AUTHENTICATION_CONFIG,
KsqlRestConfig.SSL_CLIENT_AUTHENTICATION_NONE)
.withProperty(KsqlRestConfig.KSQL_SSL_KEYSTORE_ALIAS_INTERNAL_CONFIG, "node-2.example.com")
.withProperty(KsqlConfig.KSQL_DEFAULT_VALUE_FORMAT_CONFIG, "JSON")
.withProperties(COMMON_CONFIG)
.withProperties(internalKeyStoreProps(false))
.build();
Expand Down

0 comments on commit 8630967

Please sign in to comment.