From 978c79ad2fa4b451c1022bb11fbf82d638e7635e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Tue, 4 Aug 2020 15:58:08 -0500 Subject: [PATCH] fix: address Andy's comments - inject DenyListPropertyValidator to Resource classes - add unit tests for WSQueryEndpoint - validate only streamProperties (no requestProperties) - minor fixes --- .../properties/DenyListPropertyValidator.java | 15 +-- .../io/confluent/ksql/util/KsqlConfig.java | 5 +- .../DenyListPropertyValidatorTest.java | 15 ++- .../ksql/rest/server/KsqlRestApplication.java | 22 +++- .../rest/server/resources/KsqlResource.java | 22 ++-- .../streaming/StreamedQueryResource.java | 21 ++-- .../resources/streaming/WSQueryEndpoint.java | 17 +-- .../rest/server/KsqlRestApplicationTest.java | 6 +- .../rest/server/computation/RecoveryTest.java | 7 +- .../server/resources/KsqlResourceTest.java | 71 ++++++----- .../server/resources/WSQueryEndpointTest.java | 111 ++++++++++++++++++ .../streaming/StreamedQueryResourceTest.java | 22 +++- 12 files changed, 239 insertions(+), 95 deletions(-) create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/properties/DenyListPropertyValidator.java b/ksqldb-common/src/main/java/io/confluent/ksql/properties/DenyListPropertyValidator.java index 9a2b75cf417b..5d6dde7f503a 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/properties/DenyListPropertyValidator.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/properties/DenyListPropertyValidator.java @@ -16,6 +16,7 @@ package io.confluent.ksql.properties; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import io.confluent.ksql.util.KsqlException; import java.util.Collection; @@ -37,14 +38,14 @@ public DenyListPropertyValidator(final Collection immutableProps) { /** * Validates if a list of properties are part of the list of denied properties. - * @throws if a property is part of the denied list. + * @throws if at least one property is part of the denied list. */ public void validateAll(final Map properties) { - properties.forEach((name ,v) -> { - if (immutableProps.contains(name)) { - throw new KsqlException(String.format("A property override was set locally for a " - + "property that the server prohibits overrides for: '%s'", name)); - } - }); + final Set propsDenied = Sets.intersection(immutableProps, properties.keySet()); + if (!propsDenied.isEmpty()) { + throw new KsqlException(String.format("One or more properties overrides set locally are " + + "prohibited by the KSQL server (use UNSET to reset their default value): %s", + propsDenied)); + } } } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index c7bab4495991..c75c6441e421 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -335,8 +335,7 @@ public class KsqlConfig extends AbstractConfig { public static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST = "ksql.properties.overrides.denylist"; - public static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST_DEFAULT = ""; - public static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC = "Comma-separated list of " + private static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC = "Comma-separated list of " + "properties that KSQL users cannot override."; private enum ConfigGeneration { @@ -771,7 +770,7 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { .define( KSQL_PROPERTIES_OVERRIDES_DENYLIST, Type.LIST, - KSQL_PROPERTIES_OVERRIDES_DENYLIST_DEFAULT, + "", Importance.LOW, KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC ) diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/properties/DenyListPropertyValidatorTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/properties/DenyListPropertyValidatorTest.java index f8cf8171d545..9a2b32cde98f 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/properties/DenyListPropertyValidatorTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/properties/DenyListPropertyValidatorTest.java @@ -31,7 +31,8 @@ public class DenyListPropertyValidatorTest { @Before public void setUp() { validator = new DenyListPropertyValidator(Arrays.asList( - "immutable-property" + "immutable-property-1", + "immutable-property-2" )); } @@ -41,21 +42,23 @@ public void shouldThrowOnDenyListedProperty() { final KsqlException e = assertThrows( KsqlException.class, () -> validator.validateAll(ImmutableMap.of( - "immutable-property", "v1", - "anything", "v2" + "immutable-property-1", "v1", + "anything", "v2", + "immutable-property-2", "v3" )) ); // Then: assertThat(e.getMessage(), containsString( - "A property override was set locally for a property that the server prohibits " - + "overrides for: 'immutable-property'" + "One or more properties overrides set locally are prohibited by the KSQL server " + + "(use UNSET to reset their default value): " + + "[immutable-property-1, immutable-property-2]" )); } @Test - public void shouldNotThrowOnConfigurableProp() { + public void shouldNotThrowOnAllowedProp() { validator.validateAll(ImmutableMap.of( "mutable-1", "v1", "anything", "v2" diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 40996f887acb..b565bb83e86c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -49,6 +49,7 @@ import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.properties.DenyListPropertyValidator; import io.confluent.ksql.query.id.SpecificQueryIdGenerator; import io.confluent.ksql.rest.ErrorMessages; import io.confluent.ksql.rest.Errors; @@ -183,6 +184,7 @@ public final class KsqlRestApplication implements Executable { private Server apiServer = null; private final CompletableFuture terminatedFuture = new CompletableFuture<>(); private final QueryMonitor queryMonitor; + private final DenyListPropertyValidator denyListPropertyValidator; // The startup thread that can be interrupted if necessary during shutdown. This should only // happen if startup hangs. @@ -218,7 +220,8 @@ public static SourceName getCommandsStreamName() { final Optional heartbeatAgent, final Optional lagReportingAgent, final Vertx vertx, - final QueryMonitor ksqlQueryMonitor + final QueryMonitor ksqlQueryMonitor, + final DenyListPropertyValidator denyListPropertyValidator ) { log.debug("Creating instance of ksqlDB API server"); this.serviceContext = requireNonNull(serviceContext, "serviceContext"); @@ -245,6 +248,8 @@ public static SourceName getCommandsStreamName() { this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent"); this.lagReportingAgent = requireNonNull(lagReportingAgent, "lagReportingAgent"); this.vertx = requireNonNull(vertx, "vertx"); + this.denyListPropertyValidator = + requireNonNull(denyListPropertyValidator, "denyListPropertyValidator"); this.serverInfoResource = new ServerInfoResource(serviceContext, ksqlConfigNoPort); if (heartbeatAgent.isPresent()) { @@ -305,7 +310,8 @@ public void startAsync() { KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), authorizationValidator, errorHandler, - pullQueryExecutor + pullQueryExecutor, + denyListPropertyValidator ); startAsyncThreadRef.set(Thread.currentThread()); @@ -695,6 +701,9 @@ static KsqlRestApplication buildApplication( final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor( ksqlEngine, routingFilterFactory, ksqlConfig); + final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator( + ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST)); + final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, commandStore, @@ -704,7 +713,8 @@ static KsqlRestApplication buildApplication( versionChecker::updateLastRequestTime, authorizationValidator, errorHandler, - pullQueryExecutor + pullQueryExecutor, + denyListPropertyValidator ); final KsqlResource ksqlResource = new KsqlResource( @@ -713,7 +723,8 @@ static KsqlRestApplication buildApplication( Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), versionChecker::updateLastRequestTime, authorizationValidator, - errorHandler + errorHandler, + denyListPropertyValidator ); final List managedTopics = new LinkedList<>(); @@ -773,7 +784,8 @@ static KsqlRestApplication buildApplication( heartbeatAgent, lagReportingAgent, vertx, - queryMonitor + queryMonitor, + denyListPropertyValidator ); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 1697ec24a5f7..df8ca7c70b94 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -98,12 +98,12 @@ public class KsqlResource implements KsqlConfigurable { private final ActivenessRegistrar activenessRegistrar; private final BiFunction injectorFactory; private final Optional authorizationValidator; + private final DenyListPropertyValidator denyListPropertyValidator; private RequestValidator validator; private RequestHandler handler; private final Errors errorHandler; private KsqlHostInfo localHost; private URL localUrl; - private DenyListPropertyValidator denyListPropertyValidator; public KsqlResource( final KsqlEngine ksqlEngine, @@ -111,7 +111,8 @@ public KsqlResource( final Duration distributedCmdResponseTimeout, final ActivenessRegistrar activenessRegistrar, final Optional authorizationValidator, - final Errors errorHandler + final Errors errorHandler, + final DenyListPropertyValidator denyListPropertyValidator ) { this( ksqlEngine, @@ -120,7 +121,8 @@ public KsqlResource( activenessRegistrar, Injectors.DEFAULT, authorizationValidator, - errorHandler + errorHandler, + denyListPropertyValidator ); } @@ -131,7 +133,8 @@ public KsqlResource( final ActivenessRegistrar activenessRegistrar, final BiFunction injectorFactory, final Optional authorizationValidator, - final Errors errorHandler + final Errors errorHandler, + final DenyListPropertyValidator denyListPropertyValidator ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); @@ -143,6 +146,8 @@ public KsqlResource( this.authorizationValidator = Objects .requireNonNull(authorizationValidator, "authorizationValidator"); this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler"); + this.denyListPropertyValidator = + Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator"); } @Override @@ -190,9 +195,6 @@ public void configure(final KsqlConfig config) { distributedCmdResponseTimeout ) ); - - this.denyListPropertyValidator = new DenyListPropertyValidator( - config.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST)); } public EndpointResponse terminateCluster( @@ -241,13 +243,11 @@ public EndpointResponse handleKsqlStatements( request, distributedCmdResponseTimeout); - final Map requestProperties = request.getRequestProperties(); - denyListPropertyValidator.validateAll(requestProperties); - final Map configProperties = request.getConfigOverrides(); denyListPropertyValidator.validateAll(configProperties); - final KsqlRequestConfig requestConfig = new KsqlRequestConfig(requestProperties); + final KsqlRequestConfig requestConfig = + new KsqlRequestConfig(request.getRequestProperties()); final List statements = ksqlEngine.parse(request.getKsql()); validator.validate( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index bcb893bca326..42bcad345b3a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -84,7 +84,7 @@ public class StreamedQueryResource implements KsqlConfigurable { private final PullQueryExecutor pullQueryExecutor; private Optional pullQueryMetrics; private final Time time; - private DenyListPropertyValidator denyListPropertyValidator; + private final DenyListPropertyValidator denyListPropertyValidator; public StreamedQueryResource( final KsqlEngine ksqlEngine, @@ -94,7 +94,8 @@ public StreamedQueryResource( final ActivenessRegistrar activenessRegistrar, final Optional authorizationValidator, final Errors errorHandler, - final PullQueryExecutor pullQueryExecutor + final PullQueryExecutor pullQueryExecutor, + final DenyListPropertyValidator denyListPropertyValidator ) { this( ksqlEngine, @@ -105,7 +106,8 @@ public StreamedQueryResource( activenessRegistrar, authorizationValidator, errorHandler, - pullQueryExecutor + pullQueryExecutor, + denyListPropertyValidator ); } @@ -121,7 +123,8 @@ public StreamedQueryResource( final ActivenessRegistrar activenessRegistrar, final Optional authorizationValidator, final Errors errorHandler, - final PullQueryExecutor pullQueryExecutor + final PullQueryExecutor pullQueryExecutor, + final DenyListPropertyValidator denyListPropertyValidator ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); @@ -135,6 +138,8 @@ public StreamedQueryResource( this.authorizationValidator = authorizationValidator; this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler"); this.pullQueryExecutor = Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor"); + this.denyListPropertyValidator = + Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator"); this.time = Time.SYSTEM; } @@ -152,9 +157,6 @@ public void configure(final KsqlConfig config) { ksqlEngine.getServiceId(), ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS))) : empty(); - - this.denyListPropertyValidator = new DenyListPropertyValidator( - config.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST)); } public EndpointResponse streamQuery( @@ -219,9 +221,6 @@ private EndpointResponse handleStatement( statement.getStatement()) ); - final Map requestProperties = request.getRequestProperties(); - denyListPropertyValidator.validateAll(requestProperties); - final Map configProperties = request.getConfigOverrides(); denyListPropertyValidator.validateAll(configProperties); @@ -233,7 +232,7 @@ private EndpointResponse handleStatement( securityContext.getServiceContext(), queryStmt, configProperties, - requestProperties, + request.getRequestProperties(), isInternalRequest ); if (pullQueryMetrics.isPresent()) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 8ddc4086810b..65c525e7b693 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -69,9 +69,9 @@ public class WSQueryEndpoint { private final Optional authorizationValidator; private final Errors errorHandler; private final PullQueryExecutor pullQueryExecutor; + private final DenyListPropertyValidator denyListPropertyValidator; private WebSocketSubscriber subscriber; - private DenyListPropertyValidator denyListPropertyValidator; // CHECKSTYLE_RULES.OFF: ParameterNumberCheck public WSQueryEndpoint( @@ -85,7 +85,8 @@ public WSQueryEndpoint( final Duration commandQueueCatchupTimeout, final Optional authorizationValidator, final Errors errorHandler, - final PullQueryExecutor pullQueryExecutor + final PullQueryExecutor pullQueryExecutor, + final DenyListPropertyValidator denyListPropertyValidator ) { this(ksqlConfig, statementParser, @@ -99,7 +100,8 @@ public WSQueryEndpoint( commandQueueCatchupTimeout, authorizationValidator, errorHandler, - pullQueryExecutor + pullQueryExecutor, + denyListPropertyValidator ); } @@ -118,7 +120,8 @@ public WSQueryEndpoint( final Duration commandQueueCatchupTimeout, final Optional authorizationValidator, final Errors errorHandler, - final PullQueryExecutor pullQueryExecutor + final PullQueryExecutor pullQueryExecutor, + final DenyListPropertyValidator denyListPropertyValidator ) { this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); @@ -137,9 +140,8 @@ public WSQueryEndpoint( Objects.requireNonNull(authorizationValidator, "authorizationValidator"); this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler"); this.pullQueryExecutor = Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor"); - - this.denyListPropertyValidator = new DenyListPropertyValidator( - ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST)); + this.denyListPropertyValidator = + Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator"); } public void executeStreamQuery(final ServerWebSocket webSocket, final MultiMap requestParams, @@ -250,7 +252,6 @@ private PreparedStatement parseStatement(final KsqlRequest request) { private void handleQuery(final RequestContext info, final Query query) { final Map clientLocalProperties = info.request.getConfigOverrides(); - denyListPropertyValidator.validateAll(clientLocalProperties); final WebSocketSubscriber streamSubscriber = new WebSocketSubscriber<>(info.websocket); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 780534f64e96..02e28594e071 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -41,6 +41,7 @@ import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.properties.DenyListPropertyValidator; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlErrorMessage; @@ -135,6 +136,8 @@ public class KsqlRestApplicationTest { private EndpointResponse response; @Mock private QueryMonitor queryMonitor; + @Mock + private DenyListPropertyValidator denyListPropertyValidator; @Mock private SchemaRegistryClient schemaRegistryClient; @@ -495,7 +498,8 @@ private void givenAppWithRestConfig(final Map restConfigMap) { Optional.of(heartbeatAgent), Optional.of(lagReportingAgent), vertx, - queryMonitor + queryMonitor, + denyListPropertyValidator ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 65a9f29b7d99..ed3f93a89fab 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -36,6 +36,7 @@ import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.name.SourceName; +import io.confluent.ksql.properties.DenyListPropertyValidator; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.QueryIdGenerator; import io.confluent.ksql.query.id.SpecificQueryIdGenerator; @@ -94,6 +95,9 @@ public class RecoveryTest { @Mock @SuppressWarnings("unchecked") private final Producer transactionalProducer = (Producer) mock(Producer.class); + @Mock + private DenyListPropertyValidator denyListPropertyValidator = + mock(DenyListPropertyValidator.class); private final KsqlServer server1 = new KsqlServer(commands); private final KsqlServer server2 = new KsqlServer(commands); @@ -226,7 +230,8 @@ private class KsqlServer { Duration.ofMillis(0), ()->{}, Optional.of((sc, metastore, statement) -> { }), - mock(Errors.class) + mock(Errors.class), + denyListPropertyValidator ); this.statementExecutor.configure(ksqlConfig); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index bff57af6d017..7ec22743a6ea 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -18,6 +18,7 @@ import static io.confluent.ksql.parser.ParserMatchers.configured; import static io.confluent.ksql.parser.ParserMatchers.preparedStatementText; import static io.confluent.ksql.rest.Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS; +import static io.confluent.ksql.rest.entity.ClusterTerminateRequest.DELETE_TOPIC_LIST_PROP; import static io.confluent.ksql.rest.entity.CommandId.Action.CREATE; import static io.confluent.ksql.rest.entity.CommandId.Action.DROP; import static io.confluent.ksql.rest.entity.CommandId.Action.EXECUTE; @@ -97,6 +98,7 @@ import io.confluent.ksql.parser.tree.TableElement; import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; +import io.confluent.ksql.properties.DenyListPropertyValidator; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; @@ -291,6 +293,8 @@ public class KsqlResourceTest { private Producer transactionalProducer; @Mock private Errors errorsHandler; + @Mock + private DenyListPropertyValidator denyListPropertyValidator; private KsqlResource ksqlResource; private SchemaRegistryClient schemaRegistryClient; @@ -399,7 +403,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), Optional.of(authorizationValidator), - errorsHandler + errorsHandler, + denyListPropertyValidator ); // When: @@ -429,7 +434,8 @@ public void shouldThrowOnHandleTerminateIfNotConfigured() { topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), Optional.of(authorizationValidator), - errorsHandler + errorsHandler, + denyListPropertyValidator ); // When: @@ -2174,52 +2180,39 @@ private void setUpKsqlResource() { topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), Optional.of(authorizationValidator), - errorsHandler + errorsHandler, + denyListPropertyValidator ); ksqlResource.configure(ksqlConfig); } @Test - public void shouldThrowOnDenyListedStreamProperty() { + public void shouldThrowOnDenyListValidatorWhenTerminateCluster() { + final Map terminateStreamProperties = + ImmutableMap.of(DELETE_TOPIC_LIST_PROP, Collections.singletonList("Foo")); + // Given: - ksqlResource = new KsqlResource( - ksqlEngine, - commandStore, - DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT, - activenessRegistrar, - (ec, sc) -> InjectorChain.of( - schemaInjectorFactory.apply(sc), - topicInjectorFactory.apply(ec), - new TopicDeleteInjector(ec, sc)), - Optional.of(authorizationValidator), - errorsHandler + doThrow(new KsqlException("deny override")).when(denyListPropertyValidator).validateAll( + terminateStreamProperties ); - final Map props = new HashMap<>(ksqlRestConfig.getKsqlConfigProperties()); - props.put(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST, - StreamsConfig.NUM_STREAM_THREADS_CONFIG); - ksqlResource.configure(new KsqlConfig(props)); // When: - final EndpointResponse response = ksqlResource.handleKsqlStatements( + final EndpointResponse response = ksqlResource.terminateCluster( securityContext, - new KsqlRequest( - "query", - ImmutableMap.of(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1"), // stream properties - emptyMap(), // config properties - null - ) + VALID_TERMINATE_REQUEST ); // Then: - assertThat(response.getStatus(), CoreMatchers.is(BAD_REQUEST.code())); - assertThat(((KsqlErrorMessage) response.getEntity()).getMessage(), - is("A property override was set locally for a property that the server prohibits " - + "overrides for: '" + StreamsConfig.NUM_STREAM_THREADS_CONFIG + "'")); + verify(denyListPropertyValidator).validateAll(terminateStreamProperties); + assertThat(response.getStatus(), equalTo(INTERNAL_SERVER_ERROR.code())); + assertThat(response.getEntity(), instanceOf(KsqlStatementErrorMessage.class)); + assertThat(((KsqlStatementErrorMessage) response.getEntity()).getMessage(), + containsString("deny override")); } @Test - public void shouldThrowOnDenyListedConfigProperty() { + public void shouldThrowOnDenyListValidatorWhenHandleKsqlStatement() { // Given: ksqlResource = new KsqlResource( ksqlEngine, @@ -2231,29 +2224,33 @@ public void shouldThrowOnDenyListedConfigProperty() { topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), Optional.of(authorizationValidator), - errorsHandler + errorsHandler, + denyListPropertyValidator ); final Map props = new HashMap<>(ksqlRestConfig.getKsqlConfigProperties()); props.put(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST, StreamsConfig.NUM_STREAM_THREADS_CONFIG); ksqlResource.configure(new KsqlConfig(props)); + final Map overrides = + ImmutableMap.of(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + doThrow(new KsqlException("deny override")).when(denyListPropertyValidator) + .validateAll(overrides); // When: final EndpointResponse response = ksqlResource.handleKsqlStatements( securityContext, new KsqlRequest( "query", - emptyMap(), // stream properties - ImmutableMap.of(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1"), // config properties + overrides, // stream properties + emptyMap(), null ) ); // Then: + verify(denyListPropertyValidator).validateAll(overrides); assertThat(response.getStatus(), CoreMatchers.is(BAD_REQUEST.code())); - assertThat(((KsqlErrorMessage) response.getEntity()).getMessage(), - is("A property override was set locally for a property that the server prohibits " - + "overrides for: '" + StreamsConfig.NUM_STREAM_THREADS_CONFIG + "'")); + assertThat(((KsqlErrorMessage) response.getEntity()).getMessage(), is("deny override")); } private void givenKsqlConfigWith(final Map additionalConfig) { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java new file mode 100644 index 000000000000..506d21db8147 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java @@ -0,0 +1,111 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.resources; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.properties.DenyListPropertyValidator; +import io.confluent.ksql.rest.ApiJsonMapper; +import io.confluent.ksql.rest.Errors; +import io.confluent.ksql.rest.entity.KsqlRequest; +import io.confluent.ksql.rest.server.StatementParser; +import io.confluent.ksql.rest.server.computation.CommandQueue; +import io.confluent.ksql.rest.server.execution.PullQueryExecutor; +import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint; +import io.confluent.ksql.security.KsqlSecurityContext; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.version.metrics.ActivenessRegistrar; +import io.vertx.core.MultiMap; +import io.vertx.core.http.ServerWebSocket; +import org.apache.kafka.streams.StreamsConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class WSQueryEndpointTest { + private static final ObjectMapper OBJECT_MAPPER = ApiJsonMapper.INSTANCE.get(); + + @Mock + private ServerWebSocket serverWebSocket; + @Mock + private KsqlSecurityContext ksqlSecurityContext; + @Mock + private DenyListPropertyValidator denyListPropertyValidator; + + private WSQueryEndpoint wsQueryEndpoint; + + @Before + public void setUp() { + wsQueryEndpoint = new WSQueryEndpoint( + mock(KsqlConfig.class), + mock(StatementParser.class), + mock(KsqlEngine.class), + mock(CommandQueue.class), + mock(ListeningScheduledExecutorService.class), + mock(ActivenessRegistrar.class), + mock(Duration.class), + Optional.empty(), + mock(Errors.class), + mock(PullQueryExecutor.class), + denyListPropertyValidator + ); + } + + @Test + public void shouldCallPropertyValidatorOnExecuteStream() + throws JsonProcessingException { + // Given + final Map overrides = + ImmutableMap.of(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + final MultiMap params = buildRequestParams("show streams;", overrides); + + // When + executeStreamQuery(params); + + // Then + // WS sockets do not throw any exception (closes silently). We can only verify the validator + // was called. + verify(denyListPropertyValidator).validateAll(overrides); + } + + private MultiMap buildRequestParams(final String command, final Map streamProps) + throws JsonProcessingException { + final MultiMap params = MultiMap.caseInsensitiveMultiMap(); + final KsqlRequest request = new KsqlRequest( + command, streamProps, Collections.emptyMap(), 1L); + + params.add("request", OBJECT_MAPPER.writeValueAsString(request)); + return params; + } + + private void executeStreamQuery(final MultiMap params) { + wsQueryEndpoint.executeStreamQuery(serverWebSocket, params, ksqlSecurityContext); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 66173feaf487..e52630be5e15 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -55,6 +55,7 @@ import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.properties.DenyListPropertyValidator; import io.confluent.ksql.query.BlockingRowQueue; import io.confluent.ksql.query.KafkaStreamsBuilder; import io.confluent.ksql.query.LimitHandler; @@ -76,6 +77,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; @@ -159,6 +161,8 @@ public class StreamedQueryResourceTest { private KsqlAuthorizationValidator authorizationValidator; @Mock private Errors errorsHandler; + @Mock + private DenyListPropertyValidator denyListPropertyValidator; private StreamedQueryResource testResource; private PreparedStatement invalid; @@ -193,7 +197,8 @@ public void setup() { activenessRegistrar, Optional.of(authorizationValidator), errorsHandler, - pullQueryExecutor + pullQueryExecutor, + denyListPropertyValidator ); testResource.configure(VALID_CONFIG); @@ -220,7 +225,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { activenessRegistrar, Optional.of(authorizationValidator), errorsHandler, - pullQueryExecutor + pullQueryExecutor, + denyListPropertyValidator ); // When: @@ -373,7 +379,8 @@ public void shouldThrowOnDenyListedStreamProperty() { activenessRegistrar, Optional.of(authorizationValidator), errorsHandler, - pullQueryExecutor + pullQueryExecutor, + denyListPropertyValidator ); final Map props = new HashMap<>(ImmutableMap.of( StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1" @@ -381,6 +388,10 @@ public void shouldThrowOnDenyListedStreamProperty() { props.put(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST, StreamsConfig.NUM_STREAM_THREADS_CONFIG); testResource.configure(new KsqlConfig(props)); + final Map overrides = + ImmutableMap.of(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + doThrow(new KsqlException("deny override")).when(denyListPropertyValidator) + .validateAll(overrides); when(errorsHandler.generateResponse(any(), any())) .thenReturn(badRequest("A property override was set locally for a property that the " + "server prohibits overrides for: 'num.stream.threads'")); @@ -390,8 +401,8 @@ public void shouldThrowOnDenyListedStreamProperty() { securityContext, new KsqlRequest( PULL_QUERY_STRING, - ImmutableMap.of(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1"), // stream properties - Collections.emptyMap(), // config properties + overrides, // stream properties + Collections.emptyMap(), null ), new CompletableFuture<>(), @@ -399,6 +410,7 @@ public void shouldThrowOnDenyListedStreamProperty() { ); // Then: + verify(denyListPropertyValidator).validateAll(overrides); assertThat(response.getStatus(), CoreMatchers.is(BAD_REQUEST.code())); assertThat(((KsqlErrorMessage) response.getEntity()).getMessage(), is("A property override was set locally for a property that the server prohibits "