diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index 7058f5c8dd2f..cc4377618732 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -129,6 +129,7 @@ public class CliTest { .builder(CLUSTER::bootstrapServers) .withProperty(KsqlConfig.SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY, KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention + 1) + .withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true) .build(); @ClassRule diff --git a/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java b/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java index ece1b7a08dc4..3ad421b07cbe 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java +++ b/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java @@ -30,6 +30,7 @@ public final class ImmutableProperties { .add(KsqlConfig.KSQL_EXT_DIR) .add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG) .add(KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG) + .add(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG) .addAll(KsqlConfig.SSL_CONFIG_NAMES) .build(); diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index c36d75121978..1549061aacd1 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -198,6 +198,13 @@ public class KsqlConfig extends AbstractConfig { + "\"off\" disables the validator. If set to \"auto\", KSQL will attempt to discover " + "whether the Kafka cluster supports the required API, and enables the validator if " + "it does."; + public static final String KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG = + "ksql.query.pull.skip.access.validator"; + public static final boolean KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT = false; + public static final String KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC = "If \"true\", KSQL will " + + " NOT enforce access validation checks for pull queries, which could expose Kafka topics" + + " which are secured with ACLs. Please enable only after careful consideration." + + " If \"false\", KSQL pull queries will fail against a secure Kafka cluster"; public static final String KSQL_QUERY_PULL_ENABLE_CONFIG = "ksql.query.pull.enable"; public static final String KSQL_QUERY_PULL_ENABLE_DOC = @@ -604,6 +611,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT, Importance.LOW, KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC + ).define( + KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, + Type.BOOLEAN, + KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT, + Importance.LOW, + KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksql-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactory.java index 04cbeb3a6ff8..4acd3f545122 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactory.java @@ -19,6 +19,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlServerException; +import java.util.Optional; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -29,22 +30,20 @@ public final class KsqlAuthorizationValidatorFactory { private static final Logger LOG = LoggerFactory .getLogger(KsqlAuthorizationValidatorFactory.class); private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name"; - private static final KsqlAuthorizationValidator DUMMY_VALIDATOR = - (sc, metastore, statement) -> { }; private KsqlAuthorizationValidatorFactory() { } - public static KsqlAuthorizationValidator create( + public static Optional create( final KsqlConfig ksqlConfig, final ServiceContext serviceContext ) { final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR); if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) { LOG.info("Forcing topic access validator"); - return new KsqlAuthorizationValidatorImpl(); + return Optional.of(new KsqlAuthorizationValidatorImpl()); } else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) { - return DUMMY_VALIDATOR; + return Optional.empty(); } final Admin adminClient = serviceContext.getAdminClient(); @@ -52,14 +51,14 @@ public static KsqlAuthorizationValidator create( if (isKafkaAuthorizerEnabled(adminClient)) { if (KafkaClusterUtil.isAuthorizedOperationsSupported(adminClient)) { LOG.info("KSQL topic authorization checks enabled."); - return new KsqlAuthorizationValidatorImpl(); + return Optional.of(new KsqlAuthorizationValidatorImpl()); } LOG.warn("The Kafka broker has an authorization service enabled, but the Kafka " + "version does not support authorizedOperations(). " + "KSQL topic authorization checks will not be enabled."); } - return DUMMY_VALIDATOR; + return Optional.empty(); } private static boolean isKafkaAuthorizerEnabled(final Admin adminClient) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultServiceContext.java b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultServiceContext.java index 3f464547df6b..d7e495d1cf04 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultServiceContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultServiceContext.java @@ -78,7 +78,7 @@ public DefaultServiceContext( private DefaultServiceContext( final KafkaClientSupplier kafkaClientSupplier, final Supplier adminClientSupplier, - final Function, KafkaTopicClient> topicClientSupplier, + final Function, KafkaTopicClient> topicClientProvider, final Supplier srClientSupplier, final Supplier connectClientSupplier, final Supplier ksqlClientSupplier @@ -100,7 +100,7 @@ private DefaultServiceContext( this.kafkaClientSupplier = requireNonNull(kafkaClientSupplier, "kafkaClientSupplier"); this.topicClientSupplier = new MemoizedSupplier<>( - () -> topicClientSupplier.apply(this.adminClientSupplier)); + () -> topicClientProvider.apply(this.adminClientSupplier)); } @Override diff --git a/ksql-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java index 764a106b678d..081ffc2a8753 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java @@ -18,7 +18,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verifyZeroInteractions; @@ -31,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; @@ -78,44 +78,45 @@ public void shouldReturnAuthorizationValidator() { givenKafkaAuthorizer("an-authorizer-class", Collections.emptySet()); // When: - final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create( + final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, serviceContext ); // Then - assertThat(validator, is(instanceOf(KsqlAuthorizationValidatorImpl.class))); + assertThat("validator should be present", validator.isPresent()); + assertThat(validator.get(), is(instanceOf(KsqlAuthorizationValidatorImpl.class))); } @Test - public void shouldReturnDummyValidator() { + public void shouldReturnEmptyValidator() { // Given: givenKafkaAuthorizer("", Collections.emptySet()); // When: - final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create( + final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, serviceContext ); // Then - assertThat(validator, not(instanceOf(KsqlAuthorizationValidatorImpl.class))); + assertThat(validator, is(Optional.empty())); } @Test - public void shouldReturnDummyValidatorIfNotEnabled() { + public void shouldReturnEmptyValidatorIfNotEnabled() { // Given: when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR)) .thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF); // When: - final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create( + final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, serviceContext ); // Then: - assertThat(validator, not(instanceOf(KsqlAuthorizationValidatorImpl.class))); + assertThat(validator, is(Optional.empty())); verifyZeroInteractions(adminClient); } @@ -126,29 +127,30 @@ public void shouldReturnAuthorizationValidatorIfEnabled() { .thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON); // When: - final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create( + final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, serviceContext ); // Then: - assertThat(validator, instanceOf(KsqlAuthorizationValidatorImpl.class)); + assertThat("validator should be present", validator.isPresent()); + assertThat(validator.get(), is(instanceOf(KsqlAuthorizationValidatorImpl.class))); verifyZeroInteractions(adminClient); } @Test - public void shouldReturnDummyValidatorIfAuthorizedOperationsReturnNull() { + public void shouldReturnEmptyValidatorIfAuthorizedOperationsReturnNull() { // Given: givenKafkaAuthorizer("an-authorizer-class", null); // When: - final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create( + final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, serviceContext ); // Then - assertThat(validator, not(instanceOf(KsqlAuthorizationValidatorImpl.class))); + assertThat(validator, is(Optional.empty())); } private void givenKafkaAuthorizer( diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java index 331761093c92..066fa9b0b3eb 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java @@ -69,6 +69,7 @@ public class RestQueryTranslationTest { private static final TestKsqlRestApp REST_APP = TestKsqlRestApp .builder(TEST_HARNESS::kafkaBootstrapServers) .withProperty(KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) + .withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true) .withStaticServiceContext(TEST_HARNESS::getServiceContext) .build(); @@ -106,7 +107,7 @@ public void tearDown() { @Test public void shouldBuildAndExecuteQueries() { - try (RestTestExecutor testExecutor = textExecutor()) { + try (RestTestExecutor testExecutor = testExecutor()) { testExecutor.buildAndExecuteQuery(testCase); } catch (final AssertionError e) { throw new AssertionError(e.getMessage() @@ -119,7 +120,7 @@ public void shouldBuildAndExecuteQueries() { } } - private static RestTestExecutor textExecutor() { + private static RestTestExecutor testExecutor() { return new RestTestExecutor( REST_APP.getListeners().get(0), TEST_HARNESS.getKafkaCluster(), diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java index efac12c98e8b..785b805c8314 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java @@ -402,12 +402,17 @@ private void waitForWarmStateStores( final ImmutableList expectedResponse = ImmutableList.of(queryResponse); final ImmutableList statements = ImmutableList.of(querySql); + final long waitMs = 10; final long threshold = System.currentTimeMillis() + MAX_STATIC_WARMUP.toMillis(); while (System.currentTimeMillis() < threshold) { final RestResponse resp = restClient.makeQueryRequest(querySql, null); if (resp.isErroneous()) { - Thread.yield(); + try { + Thread.sleep(waitMs); + } catch (InterruptedException e) { + // ignore + } LOG.info("Server responded with an error code to a pull query. " + "This could be because the materialized store is not yet warm."); continue; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 416d62933d8a..31604704d844 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -380,7 +380,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { ); final StatementParser statementParser = new StatementParser(ksqlEngine); - final KsqlAuthorizationValidator authorizationValidator = + final Optional authorizationValidator = KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext); container.addEndpoint( @@ -496,7 +496,7 @@ static KsqlRestApplication buildApplication( final KsqlSecurityExtension securityExtension = loadSecurityExtension(ksqlConfig); - final KsqlAuthorizationValidator authorizationValidator = + final Optional authorizationValidator = KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext); final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index 2fa728bb2eaf..1aa548f5834a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -52,14 +52,14 @@ public class DistributingExecutor { private final CommandQueue commandQueue; private final Duration distributedCmdResponseTimeout; private final BiFunction injectorFactory; - private final KsqlAuthorizationValidator authorizationValidator; + private final Optional authorizationValidator; private final RequestValidator requestValidator; public DistributingExecutor( final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, final BiFunction injectorFactory, - final KsqlAuthorizationValidator authorizationValidator, + final Optional authorizationValidator, final RequestValidator requestValidator ) { this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); @@ -154,15 +154,17 @@ private void checkAuthorization( final MetaStore metaStore = serverExecutionContext.getMetaStore(); // Check the User will be permitted to execute this statement - authorizationValidator.checkAuthorization(userServiceContext, metaStore, statement); + authorizationValidator.ifPresent( + validator -> + validator.checkAuthorization(userServiceContext, metaStore, statement)); try { // Check the KSQL service principal will be permitted too - authorizationValidator.checkAuthorization( - serverExecutionContext.getServiceContext(), - metaStore, - statement - ); + authorizationValidator.ifPresent( + validator -> validator.checkAuthorization( + serverExecutionContext.getServiceContext(), + metaStore, + statement)); } catch (final Exception e) { throw new KsqlServerException("The KSQL server is not permitted to execute the command", e); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index b06ebcdf72d7..f828fbf14371 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -56,6 +56,7 @@ import java.time.Duration; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; import java.util.regex.PatternSyntaxException; @@ -97,7 +98,7 @@ public class KsqlResource implements KsqlConfigurable { private final Duration distributedCmdResponseTimeout; private final ActivenessRegistrar activenessRegistrar; private final BiFunction injectorFactory; - private final KsqlAuthorizationValidator authorizationValidator; + private final Optional authorizationValidator; private RequestValidator validator; private RequestHandler handler; @@ -107,7 +108,7 @@ public KsqlResource( final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, final ActivenessRegistrar activenessRegistrar, - final KsqlAuthorizationValidator authorizationValidator + final Optional authorizationValidator ) { this( ksqlEngine, @@ -125,7 +126,7 @@ public KsqlResource( final Duration distributedCmdResponseTimeout, final ActivenessRegistrar activenessRegistrar, final BiFunction injectorFactory, - final KsqlAuthorizationValidator authorizationValidator + final Optional authorizationValidator ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 149215538d3b..a0d78826dd1f 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -51,6 +51,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; import java.util.stream.Collectors; import javax.ws.rs.Consumes; import javax.ws.rs.POST; @@ -78,7 +80,7 @@ public class StreamedQueryResource implements KsqlConfigurable { private final Duration commandQueueCatchupTimeout; private final ObjectMapper objectMapper; private final ActivenessRegistrar activenessRegistrar; - private final KsqlAuthorizationValidator authorizationValidator; + private final Optional authorizationValidator; private KsqlConfig ksqlConfig; public StreamedQueryResource( @@ -87,7 +89,7 @@ public StreamedQueryResource( final Duration disconnectCheckInterval, final Duration commandQueueCatchupTimeout, final ActivenessRegistrar activenessRegistrar, - final KsqlAuthorizationValidator authorizationValidator + final Optional authorizationValidator ) { this( ksqlEngine, @@ -108,7 +110,7 @@ public StreamedQueryResource( final Duration disconnectCheckInterval, final Duration commandQueueCatchupTimeout, final ActivenessRegistrar activenessRegistrar, - final KsqlAuthorizationValidator authorizationValidator + final Optional authorizationValidator ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); @@ -175,16 +177,27 @@ private Response handleStatement( final PreparedStatement statement ) { try { - authorizationValidator.checkAuthorization( - serviceContext, - ksqlEngine.getMetaStore(), - statement.getStatement() - ); + final Consumer authValidationConsumer = + ksqlAuthorizationValidator -> ksqlAuthorizationValidator.checkAuthorization( + serviceContext, + ksqlEngine.getMetaStore(), + statement.getStatement() + ); if (statement.getStatement() instanceof Query) { final PreparedStatement queryStmt = (PreparedStatement) statement; if (queryStmt.getStatement().isPullQuery()) { + final boolean skipAccessValidation = ksqlConfig.getBoolean( + KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG); + if (authorizationValidator.isPresent() && !skipAccessValidation) { + return Errors.badRequest("Pull queries are not currently supported when " + + "access validation against Kafka is configured. If you really want to " + + "bypass this limitation please set " + + KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG + "=true " + + KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC); + } + return handlePullQuery( serviceContext, queryStmt, @@ -192,6 +205,7 @@ private Response handleStatement( ); } + authorizationValidator.ifPresent(authValidationConsumer); return handlePushQuery( serviceContext, queryStmt, @@ -200,6 +214,7 @@ private Response handleStatement( } if (statement.getStatement() instanceof PrintTopic) { + authorizationValidator.ifPresent(authValidationConsumer); return handlePrintTopic( serviceContext, request.getStreamsProperties(), diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 2828927a42aa..d987477ef372 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -88,7 +88,7 @@ public class WSQueryEndpoint { private final QueryPublisher pullQueryPublisher; private final PrintTopicPublisher topicPublisher; private final Duration commandQueueCatchupTimeout; - private final KsqlAuthorizationValidator authorizationValidator; + private final Optional authorizationValidator; private final KsqlSecurityExtension securityExtension; private final UserServiceContextFactory serviceContextFactory; private final DefaultServiceContextFactory defaultServiceContextFactory; @@ -108,7 +108,7 @@ public WSQueryEndpoint( final ListeningScheduledExecutorService exec, final ActivenessRegistrar activenessRegistrar, final Duration commandQueueCatchupTimeout, - final KsqlAuthorizationValidator authorizationValidator, + final Optional authorizationValidator, final KsqlSecurityExtension securityExtension, final ServerState serverState ) { @@ -144,7 +144,7 @@ public WSQueryEndpoint( final PrintTopicPublisher topicPublisher, final ActivenessRegistrar activenessRegistrar, final Duration commandQueueCatchupTimeout, - final KsqlAuthorizationValidator authorizationValidator, + final Optional authorizationValidator, final KsqlSecurityExtension securityExtension, final UserServiceContextFactory serviceContextFactory, final DefaultServiceContextFactory defaultServiceContextFactory, @@ -213,15 +213,11 @@ public void onOpen(final Session session, final EndpointConfig unused) { serviceContext = createServiceContext(session.getUserPrincipal()); - authorizationValidator.checkAuthorization( - serviceContext, - ksqlEngine.getMetaStore(), - preparedStatement.getStatement() - ); - final Statement statement = preparedStatement.getStatement(); final Class type = statement.getClass(); + validateKafkaAuthorization(statement); + HANDLER_MAP .getOrDefault(type, WSQueryEndpoint::handleUnsupportedStatement) .handle(this, new RequestContext(session, request, serviceContext), statement); @@ -346,6 +342,27 @@ private PreparedStatement parseStatement(final KsqlRequest request) { } } + private void validateKafkaAuthorization(final Statement statement) { + if (statement instanceof Query && ((Query) statement).isPullQuery()) { + final boolean skipAccessValidation = ksqlConfig.getBoolean( + KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG); + if (authorizationValidator.isPresent() && !skipAccessValidation) { + throw new KsqlException("Pull queries are not currently supported when " + + "access validation against Kafka is configured. If you really want to " + + "bypass this limitation please set " + + KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG + "=true " + + KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC); + } + } else { + authorizationValidator.ifPresent(validator -> validator.checkAuthorization( + serviceContext, + ksqlEngine.getMetaStore(), + statement) + ); + } + } + + @SuppressWarnings({"unused"}) private void handleQuery(final RequestContext info, final Query query) { final Map clientLocalProperties = info.request.getStreamsProperties(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java index 4a45a309dcea..f65328250980 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java @@ -37,6 +37,7 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.test.util.KsqlIdentifierTestUtil; import io.confluent.ksql.test.util.TestBasicJaasConfig; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.TestDataProvider; import io.confluent.ksql.util.UserDataProvider; import io.confluent.rest.RestConfig; @@ -110,6 +111,7 @@ public class PullQueryFunctionalTest { .withBasicCredentials(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD) .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.STATE_DIR_CONFIG, getNewStateDir()) + .withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true) .withProperty(RestConfig.AUTHENTICATION_METHOD_CONFIG, RestConfig.AUTHENTICATION_METHOD_BASIC) .withProperty(RestConfig.AUTHENTICATION_REALM_CONFIG, PROPS_JAAS_REALM) .withProperty(RestConfig.AUTHENTICATION_ROLES_CONFIG, KSQL_CLUSTER_ID) @@ -121,6 +123,7 @@ public class PullQueryFunctionalTest { .withBasicCredentials(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD) .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.STATE_DIR_CONFIG, getNewStateDir()) + .withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true) .withProperty(RestConfig.AUTHENTICATION_METHOD_CONFIG, RestConfig.AUTHENTICATION_METHOD_BASIC) .withProperty(RestConfig.AUTHENTICATION_REALM_CONFIG, PROPS_JAAS_REALM) .withProperty(RestConfig.AUTHENTICATION_ROLES_CONFIG, KSQL_CLUSTER_ID) diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java index 8f957dc2c56e..2a0675503ba9 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java @@ -50,6 +50,7 @@ import io.confluent.ksql.test.util.secure.ClientTrustStore; import io.confluent.ksql.test.util.secure.Credentials; import io.confluent.ksql.test.util.secure.SecureKafkaHelper; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PageViewDataProvider; import java.util.Arrays; import java.util.List; @@ -155,6 +156,7 @@ public class RestApiTest { .withProperty("security.protocol", "SASL_SSL") .withProperty("sasl.mechanism", "PLAIN") .withProperty("sasl.jaas.config", SecureKafkaHelper.buildJaasConfig(NORMAL_USER)) + .withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true) .withProperties(ClientTrustStore.trustStoreProps()) .build(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index 9961871029a9..83abe615b4f1 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -122,7 +122,7 @@ public void setUp() throws InterruptedException { queue, DURATION_10_MS, (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), - authorizationValidator, + Optional.of(authorizationValidator), requestValidator ); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index bd28ae140e61..d414585f83f0 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -218,7 +218,7 @@ private class KsqlServer { fakeCommandQueue, Duration.ofMillis(0), ()->{}, - (sc, metastore, statement) -> { } + Optional.of((sc, metastore, statement) -> { }) ); this.statementExecutor.configure(ksqlConfig); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 6d804b5bb862..13fb8b4f74e8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -364,7 +364,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { schemaInjectorFactory.apply(sc), topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), - authorizationValidator + Optional.of(authorizationValidator) ); // Then: @@ -392,7 +392,7 @@ public void shouldThrowOnHandleTerminateIfNotConfigured() { schemaInjectorFactory.apply(sc), topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), - authorizationValidator + Optional.of(authorizationValidator) ); // Then: @@ -2077,7 +2077,7 @@ private void setUpKsqlResource() { schemaInjectorFactory.apply(sc), topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), - authorizationValidator + Optional.of(authorizationValidator) ); ksqlResource.configure(ksqlConfig); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index d954419809b4..283c7793aae7 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -21,7 +21,9 @@ import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -71,6 +73,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.Map; +import java.util.Optional; import java.util.Scanner; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeoutException; @@ -138,9 +141,13 @@ public class StreamedQueryResourceTest { @Before public void setup() { when(serviceContext.getTopicClient()).thenReturn(mockKafkaTopicClient); - statement = PreparedStatement.of("s", mock(Statement.class)); + statement = PreparedStatement.of(PUSH_QUERY_STRING, mock(Statement.class)); when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)).thenReturn(statement); - when(mockStatementParser.parseSingleStatement(PULL_QUERY_STRING)).thenReturn(statement); + + final Query pullQuery = mock(Query.class); + when(pullQuery.isPullQuery()).thenReturn(true); + final PreparedStatement pullQueryStatement = PreparedStatement.of(PULL_QUERY_STRING, pullQuery); + when(mockStatementParser.parseSingleStatement(PULL_QUERY_STRING)).thenReturn(pullQueryStatement); testResource = new StreamedQueryResource( mockKsqlEngine, @@ -149,7 +156,7 @@ public void setup() { DISCONNECT_CHECK_INTERVAL, COMMAND_QUEUE_CATCHUP_TIMOEUT, activenessRegistrar, - authorizationValidator + Optional.of(authorizationValidator) ); testResource.configure(VALID_CONFIG); @@ -174,7 +181,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { DISCONNECT_CHECK_INTERVAL, COMMAND_QUEUE_CATCHUP_TIMOEUT, activenessRegistrar, - authorizationValidator + Optional.of(authorizationValidator) ); // Then: @@ -257,19 +264,13 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb } @Test - public void shouldNotCreateAdminClientForPullQuery() throws Exception { - // When: - testResource.streamQuery( - serviceContext, - new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), null) - ); + public void shouldNotCreateExternalClientsForPullQuery() { + // Given + testResource.configure(new KsqlConfig(ImmutableMap.of( + StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1", + KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true + ))); - // Then: - verify(serviceContext, never()).getAdminClient(); - } - - @Test - public void shouldNotCreateConnectClientForPullQuery() throws Exception { // When: testResource.streamQuery( serviceContext, @@ -277,31 +278,51 @@ public void shouldNotCreateConnectClientForPullQuery() throws Exception { ); // Then: + verify(serviceContext, never()).getAdminClient(); verify(serviceContext, never()).getConnectClient(); + verify(serviceContext, never()).getSchemaRegistryClient(); + verify(serviceContext, never()).getTopicClient(); } @Test - public void shouldNotCreateSRClientForPullQuery() throws Exception { + public void shouldThrowExceptionForPullQueryIfValidating() { // When: - testResource.streamQuery( + final Response response = testResource.streamQuery( serviceContext, new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), null) ); // Then: - verify(serviceContext, never()).getSchemaRegistryClient(); + assertThat(response.getStatus(), is(Errors.badRequest("").getStatus())); + assertThat(response.getEntity(), is(instanceOf(KsqlErrorMessage.class))); + final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) response.getEntity(); + assertThat( + expectedEntity.getMessage(), + containsString(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG) + ); } @Test - public void shouldNotCreateTopicClientForPullQuery() throws Exception { + public void shouldPassCheckForPullQueryIfNotValidating() { + // Given + testResource.configure(new KsqlConfig(ImmutableMap.of( + StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1", + KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true + ))); + // When: - testResource.streamQuery( + final Response response = testResource.streamQuery( serviceContext, new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), null) ); // Then: - verify(serviceContext, never()).getTopicClient(); + assertThat(response.getStatus(), is(Errors.badRequest("").getStatus())); + final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) response.getEntity(); + assertThat( + expectedEntity.getMessage(), + not(containsString(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG)) + ); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index fbdc4f1d57b9..7be34e9d95f1 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -15,6 +15,7 @@ package io.confluent.ksql.rest.server.resources.streaming; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -190,7 +191,7 @@ public void setUp() { topicPublisher, activenessRegistrar, COMMAND_QUEUE_CATCHUP_TIMEOUT, - authorizationValidator, + Optional.of(authorizationValidator), securityExtension, serviceContextFactory, defaultServiceContextProvider, @@ -385,6 +386,8 @@ public void shouldHandlePushQuery() { @Test public void shouldHandlePullQuery() { // Given: + when(ksqlConfig.getBoolean(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG)) + .thenReturn(true); givenQueryIs(QueryType.PULL); givenRequestIs(query); @@ -405,6 +408,22 @@ public void shouldHandlePullQuery() { any()); } + @Test + public void shouldFailPullQueryIfValidating() throws Exception { + // Given: + givenQueryIs(QueryType.PULL); + givenRequestIs(query); + + // When: + wsQueryEndpoint.onOpen(session, null); + + // Then: + verifyClosedContainingReason( + "Pull queries are not currently supported", + CloseCodes.CANNOT_ACCEPT + ); + } + @Test public void shouldHandlePrintTopic() { // Given: @@ -616,6 +635,13 @@ private static String serialize(final KsqlRequest request) { } } + private void verifyClosedContainingReason(final String reason, final CloseCodes code) throws Exception { + verify(session).close(closeReasonCaptor.capture()); + final CloseReason closeReason = closeReasonCaptor.getValue(); + assertThat(closeReason.getReasonPhrase(), containsString(reason)); + assertThat(closeReason.getCloseCode(), is(code)); + } + private void verifyClosedWithReason(final String reason, final CloseCodes code) throws Exception { verify(session).close(closeReasonCaptor.capture()); final CloseReason closeReason = closeReasonCaptor.getValue();