From bf8f3c6a70f34686d0fa830dc554fc917e45ca26 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sun, 21 Feb 2021 12:48:56 -0800 Subject: [PATCH 01/11] feat: classify authorization exception as user error --- .../confluent/ksql/query/QueryExecutor.java | 5 +- .../query/TopicAuthorizationClassifier.java | 52 ++++++++++++++++++ .../TopicAuthorizationClassifierTest.java | 55 +++++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 8eb6e39958f1..f3b6bfd86869 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -226,9 +226,12 @@ public PersistentQueryMetadata buildPersistentQuery( )); final QueryErrorClassifier topicClassifier = new MissingTopicClassifier(applicationId); + final QueryErrorClassifier authorizationClassifier = + new TopicAuthorizationClassifier(applicationId); final QueryErrorClassifier classifier = buildConfiguredClassifiers(ksqlConfig, applicationId) .map(topicClassifier::and) - .orElse(topicClassifier); + .orElse(topicClassifier) + .and(authorizationClassifier); return new PersistentQueryMetadata( statementText, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java new file mode 100644 index 000000000000..ba4d5f13296e --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import io.confluent.ksql.query.QueryError.Type; +import java.util.Objects; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code TopicAuthorizationClassifier} classifies errors by querying the broker + * to check that all used topics are being accessible. + */ +public class TopicAuthorizationClassifier implements QueryErrorClassifier { + + private static final Logger LOG = LoggerFactory.getLogger(TopicAuthorizationClassifier.class); + + private final String queryId; + + public TopicAuthorizationClassifier(final String queryId) { + this.queryId = Objects.requireNonNull(queryId, "queryId"); + } + + @Override + public Type classify(final Throwable e) { + final Type type = e instanceof TopicAuthorizationException ? Type.USER : Type.UNKNOWN; + + if (type == Type.USER) { + LOG.info( + "Classified error as USER error based on missing topic. Query ID: {} Exception: {}", + queryId, + e); + } + + return type; + } + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java new file mode 100644 index 000000000000..1d00a9256366 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import io.confluent.ksql.query.QueryError.Type; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.streams.errors.MissingSourceTopicException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TopicAuthorizationClassifierTest { + + @Test + public void shouldClassifyTopicAuthorizationExceptionAsUserError() { + // Given: + final Exception e = new TopicAuthorizationException("foo"); + + // When: + final Type type = new TopicAuthorizationClassifier("").classify(e); + + // Then: + assertThat(type, is(Type.USER)); + } + + @Test + public void shouldClassifyNoMissingTopicAsUnknownError() { + // Given: + final Exception e = new Exception("foo"); + + // When: + final Type type = new TopicAuthorizationClassifier("").classify(e); + + // Then: + assertThat(type, is(Type.UNKNOWN)); + } + +} \ No newline at end of file From 3bf546f30f334f5a86a68ef63286b5781b9bb6cf Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 22 Feb 2021 14:19:48 -0800 Subject: [PATCH 02/11] Github comments --- .../io/confluent/ksql/query/TopicAuthorizationClassifier.java | 4 ++-- .../ksql/query/TopicAuthorizationClassifierTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java index ba4d5f13296e..6a8b32e519fc 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"; you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -41,7 +41,7 @@ public Type classify(final Throwable e) { if (type == Type.USER) { LOG.info( - "Classified error as USER error based on missing topic. Query ID: {} Exception: {}", + "Classified error as USER error based on missing access rights. Query ID: {} Exception: {}", queryId, e); } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java index 1d00a9256366..443bd71654d0 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"; you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -41,7 +41,7 @@ public void shouldClassifyTopicAuthorizationExceptionAsUserError() { } @Test - public void shouldClassifyNoMissingTopicAsUnknownError() { + public void shouldClassifyNoTopicAuthorizationExceptionAsUnknownError() { // Given: final Exception e = new Exception("foo"); From b27225a955234170d515be45f19b39c6432a7194 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 22 Feb 2021 16:44:09 -0800 Subject: [PATCH 03/11] Add handling for GroupAuthorizationException Add integration test for error classification --- .../query/GroupAuthorizationClassifier.java | 52 ++++++ .../ksql/query/MissingTopicClassifier.java | 4 +- .../confluent/ksql/query/QueryExecutor.java | 3 +- .../query/TopicAuthorizationClassifier.java | 6 +- .../integration/SecureIntegrationTest.java | 176 +++++++++++++++++- .../GroupAuthorizationClassifierTest.java | 55 ++++++ .../TopicAuthorizationClassifierTest.java | 3 +- 7 files changed, 290 insertions(+), 9 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java new file mode 100644 index 000000000000..1a036a83b8a2 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java @@ -0,0 +1,52 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import io.confluent.ksql.query.QueryError.Type; +import java.util.Objects; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code GroupAuthorizationClassifier} classifies missing consumer group ACLs as user error + */ +public class GroupAuthorizationClassifier implements QueryErrorClassifier { + + private static final Logger LOG = LoggerFactory.getLogger(GroupAuthorizationClassifier.class); + + private final String queryId; + + public GroupAuthorizationClassifier(final String queryId) { + this.queryId = Objects.requireNonNull(queryId, "queryId"); + } + + @Override + public Type classify(final Throwable e) { + final Type type = e instanceof GroupAuthorizationException ? Type.USER : Type.UNKNOWN; + + if (type == Type.USER) { + LOG.info( + "Classified error as USER error based on missing consumer groups access rights. Query ID: {} Exception: {}", + queryId, + e); + } + + return type; + } + +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingTopicClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingTopicClassifier.java index 1aab91d5677e..ec9868553eef 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingTopicClassifier.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingTopicClassifier.java @@ -22,9 +22,7 @@ import org.slf4j.LoggerFactory; /** - * {@code MissingTopicClassifier} classifies errors by querying the broker - * to check that all topics that the query relies on being accessible exist - * and are accessible + * {@code MissingTopicClassifier} classifies missing source topic exceptions as user error */ public class MissingTopicClassifier implements QueryErrorClassifier { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index f3b6bfd86869..c2a22a9b3ddf 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -227,7 +227,8 @@ public PersistentQueryMetadata buildPersistentQuery( final QueryErrorClassifier topicClassifier = new MissingTopicClassifier(applicationId); final QueryErrorClassifier authorizationClassifier = - new TopicAuthorizationClassifier(applicationId); + new TopicAuthorizationClassifier(applicationId) + .and(new GroupAuthorizationClassifier(applicationId)); final QueryErrorClassifier classifier = buildConfiguredClassifiers(ksqlConfig, applicationId) .map(topicClassifier::and) .orElse(topicClassifier) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java index 6a8b32e519fc..aeb065552aea 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java @@ -18,12 +18,12 @@ import io.confluent.ksql.query.QueryError.Type; import java.util.Objects; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * {@code TopicAuthorizationClassifier} classifies errors by querying the broker - * to check that all used topics are being accessible. + * {@code TopicAuthorizationClassifier} classifies topic ACL exceptions as user error */ public class TopicAuthorizationClassifier implements QueryErrorClassifier { @@ -41,7 +41,7 @@ public Type classify(final Throwable e) { if (type == Type.USER) { LOG.info( - "Classified error as USER error based on missing access rights. Query ID: {} Exception: {}", + "Classified error as USER error based on missing topic access rights. Query ID: {} Exception: {}", queryId, e); } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java index 09c8f28b8309..81e68f6b07ac 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java @@ -34,6 +34,8 @@ import static org.apache.kafka.common.resource.ResourceType.CLUSTER; import static org.apache.kafka.common.resource.ResourceType.GROUP; import static org.apache.kafka.common.resource.ResourceType.TOPIC; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -44,6 +46,8 @@ import io.confluent.ksql.engine.KsqlEngineTestUtil; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; +import io.confluent.ksql.query.QueryError; +import io.confluent.ksql.query.QueryError.Type; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.services.DisabledKsqlClient; @@ -60,6 +64,8 @@ import io.confluent.ksql.util.OrderDataProvider; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; +import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -74,8 +80,11 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -243,6 +252,148 @@ public void shouldWorkWithMinimalPrefixedAcls() { assertCanAccessClusterConfig(prefix); } + @Test + public void shouldClassifyMissingSourceTopicExceptionAsUserError() { + // Given: + final String serviceId = "my-service-id_"; // Defaults to "default_" + final String prefix = "_confluent-ksql-" + serviceId; + + final Map ksqlConfig = getKsqlConfig(NORMAL_USER); + ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); + + givenAllowAcl(NORMAL_USER, + resource(CLUSTER, "kafka-cluster"), + ops(DESCRIBE_CONFIGS)); + + givenAllowAcl(NORMAL_USER, + resource(TOPIC, INPUT_TOPIC), + ops(READ)); + + givenAllowAcl(NORMAL_USER, + resource(TOPIC, outputTopic), + ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); + + givenAllowAcl(NORMAL_USER, + prefixedResource(TOPIC, prefix), + ops(ALL)); + + givenAllowAcl(NORMAL_USER, + prefixedResource(GROUP, prefix), + ops(ALL)); + + givenTestSetupWithConfig(ksqlConfig); + + topicClient.deleteTopics(Collections.singleton(INPUT_TOPIC)); + + // Then: + assertQueryFailsWithUserError( + String.format( + "CREATE STREAM %s WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM %s;", + outputTopic, + INPUT_STREAM + ), + String.format( + "%s: One or more source topics were missing during rebalance", + MissingSourceTopicException.class.getName() + ) + ); + } + + @Test + public void shouldClassifyTopicAuthorizationExceptionAsUserError() { + // Given: + final String serviceId = "my-service-id_"; // Defaults to "default_" + final String prefix = "_confluent-ksql-" + serviceId; + + final Map ksqlConfig = getKsqlConfig(NORMAL_USER); + ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); + + givenAllowAcl(NORMAL_USER, + resource(CLUSTER, "kafka-cluster"), + ops(DESCRIBE_CONFIGS)); + + givenAllowAcl(NORMAL_USER, + resource(TOPIC, INPUT_TOPIC), + ops(READ)); + + givenAllowAcl(NORMAL_USER, + prefixedResource(TOPIC, prefix), + ops(ALL)); + + givenAllowAcl(NORMAL_USER, + resource(TOPIC, outputTopic), + ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); + + givenAllowAcl(NORMAL_USER, + prefixedResource(GROUP, prefix), + ops(ALL)); + + givenTestSetupWithConfig(ksqlConfig); + + TEST_HARNESS.getKafkaCluster().addUserAcl( + NORMAL_USER.username, + AclPermissionType.DENY, + resource(TOPIC, INPUT_TOPIC), + ops(READ) + ); + + // Then: + assertQueryFailsWithUserError( + String.format( + "CREATE STREAM %s WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM %s;", + outputTopic, + INPUT_STREAM + ), + String.format( + "%s: Not authorized to access topics: [%s]", + TopicAuthorizationException.class.getName(), + INPUT_TOPIC + ) + ); + } + + @Test + public void shouldClassifyGroupAuthorizationExceptionAsUserError() { + // Given: + final String serviceId = "my-service-id_"; // Defaults to "default_" + final String prefix = "_confluent-ksql-" + serviceId; + + final Map ksqlConfig = getKsqlConfig(NORMAL_USER); + ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); + + givenAllowAcl(NORMAL_USER, + resource(CLUSTER, "kafka-cluster"), + ops(DESCRIBE_CONFIGS)); + + givenAllowAcl(NORMAL_USER, + resource(TOPIC, INPUT_TOPIC), + ops(READ)); + + givenAllowAcl(NORMAL_USER, + resource(TOPIC, outputTopic), + ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); + + givenAllowAcl(NORMAL_USER, + prefixedResource(TOPIC, prefix), + ops(ALL)); + + givenTestSetupWithConfig(ksqlConfig); + + // Then: + assertQueryFailsWithUserError( + String.format( + "CREATE STREAM %s WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM %s;", + outputTopic, + INPUT_STREAM + ), + String.format( + "%s: Not authorized to access group: %squery_", + GroupAuthorizationException.class.getName(), + prefix + ) + "%s" + ); + } + // Requires correctly configured schema-registry running //@Test @SuppressWarnings("unused") @@ -321,6 +472,29 @@ private void assertCanRunKsqlQuery( TEST_HARNESS.verifyAvailableRecords(outputTopic, greaterThan(0)); } + private void assertQueryFailsWithUserError( + final String query, + final String errorMsg + ) { + final QueryMetadata queryMetadata = KsqlEngineTestUtil + .execute(serviceContext, ksqlEngine, query, ksqlConfig, Collections.emptyMap()).get(0); + + queryMetadata.start(); + assertThatEventually( + "", + () -> queryMetadata.getQueryErrors().size() > 0, + is(true) + ); + + for (final QueryError error : queryMetadata.getQueryErrors()) { + assertThat(error.getType(), is(Type.USER)); + assertThat( + error.getErrorMessage().split("\n")[0], + is(String.format(errorMsg, queryMetadata.getQueryId())) + ); + } + } + private static Map getBaseKsqlConfig() { final Map configs = new HashMap<>(KsqlConfigTestUtil.baseTestConfig()); configs.put( @@ -382,6 +556,6 @@ private void executePersistentQuery(final String queryString, .execute(serviceContext, ksqlEngine, query, ksqlConfig, Collections.emptyMap()).get(0); queryMetadata.start(); - queryId = ((PersistentQueryMetadata) queryMetadata).getQueryId(); + queryId = queryMetadata.getQueryId(); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java new file mode 100644 index 000000000000..e542a1967f47 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import io.confluent.ksql.query.QueryError.Type; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.streams.errors.StreamsException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class GroupAuthorizationClassifierTest { + + @Test + public void shouldClassifyTopicAuthorizationExceptionAsUserError() { + // Given: + final Exception e = new StreamsException(new GroupAuthorizationException("foo")); + + // When: + final Type type = new GroupAuthorizationClassifier("").classify(e); + + // Then: + assertThat(type, is(Type.USER)); + } + + @Test + public void shouldClassifyNoGroupAuthorizationExceptionAsUnknownError() { + // Given: + final Exception e = new Exception("foo"); + + // When: + final Type type = new GroupAuthorizationClassifier("").classify(e); + + // Then: + assertThat(type, is(Type.UNKNOWN)); + } + +} \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java index 443bd71654d0..f8a6590fd9e4 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java @@ -21,6 +21,7 @@ import io.confluent.ksql.query.QueryError.Type; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.streams.errors.MissingSourceTopicException; +import org.apache.kafka.streams.errors.StreamsException; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; @@ -31,7 +32,7 @@ public class TopicAuthorizationClassifierTest { @Test public void shouldClassifyTopicAuthorizationExceptionAsUserError() { // Given: - final Exception e = new TopicAuthorizationException("foo"); + final Exception e = new StreamsException(new TopicAuthorizationException("foo")); // When: final Type type = new TopicAuthorizationClassifier("").classify(e); From 4adaf9796eeb17ca6ef5086cc61fade378be751e Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 22 Feb 2021 16:47:19 -0800 Subject: [PATCH 04/11] Minor cleanup --- .../java/io/confluent/ksql/query/QueryExecutor.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index c2a22a9b3ddf..100a0b0113b0 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -225,14 +225,12 @@ public PersistentQueryMetadata buildPersistentQuery( applicationId )); - final QueryErrorClassifier topicClassifier = new MissingTopicClassifier(applicationId); - final QueryErrorClassifier authorizationClassifier = - new TopicAuthorizationClassifier(applicationId) - .and(new GroupAuthorizationClassifier(applicationId)); + final QueryErrorClassifier userErrorClassifiers = new MissingTopicClassifier(applicationId) + .and(new TopicAuthorizationClassifier(applicationId)) + .and(new GroupAuthorizationClassifier(applicationId)); final QueryErrorClassifier classifier = buildConfiguredClassifiers(ksqlConfig, applicationId) - .map(topicClassifier::and) - .orElse(topicClassifier) - .and(authorizationClassifier); + .map(userErrorClassifiers::and) + .orElse(userErrorClassifiers); return new PersistentQueryMetadata( statementText, From 4c801e0c9c23df6aa143478f187493a9d7f919cd Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 22 Feb 2021 16:50:15 -0800 Subject: [PATCH 05/11] Clarify test setup --- .../io/confluent/ksql/integration/SecureIntegrationTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java index 81e68f6b07ac..f2e3f6fac62a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java @@ -377,6 +377,8 @@ public void shouldClassifyGroupAuthorizationExceptionAsUserError() { prefixedResource(TOPIC, prefix), ops(ALL)); + // Skip setting consumer group permissions + givenTestSetupWithConfig(ksqlConfig); // Then: From 2cb2e1a483bacdc0e8aff0c2507eecfca3e091c6 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 23 Feb 2021 13:15:31 -0800 Subject: [PATCH 06/11] Add tx authorization handling --- .../query/GroupAuthorizationClassifier.java | 1 - .../confluent/ksql/query/QueryExecutor.java | 3 +- .../query/TopicAuthorizationClassifier.java | 1 - .../TransactionAuthorizationClassifier.java | 57 +++++++++++++++ .../integration/SecureIntegrationTest.java | 69 ++++++++++++++++--- .../GroupAuthorizationClassifierTest.java | 2 +- .../TopicAuthorizationClassifierTest.java | 2 +- ...ransactionAuthorizationClassifierTest.java | 56 +++++++++++++++ 8 files changed, 178 insertions(+), 13 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/query/TransactionAuthorizationClassifierTest.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java index 1a036a83b8a2..a60d453fe3ba 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java @@ -18,7 +18,6 @@ import io.confluent.ksql.query.QueryError.Type; import java.util.Objects; import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 100a0b0113b0..5c484d8ab706 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -227,7 +227,8 @@ public PersistentQueryMetadata buildPersistentQuery( final QueryErrorClassifier userErrorClassifiers = new MissingTopicClassifier(applicationId) .and(new TopicAuthorizationClassifier(applicationId)) - .and(new GroupAuthorizationClassifier(applicationId)); + .and(new GroupAuthorizationClassifier(applicationId)) + .and(new TransactionAuthorizationClassifier(applicationId)); final QueryErrorClassifier classifier = buildConfiguredClassifiers(ksqlConfig, applicationId) .map(userErrorClassifiers::and) .orElse(userErrorClassifiers); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java index aeb065552aea..3a09a6589365 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java @@ -18,7 +18,6 @@ import io.confluent.ksql.query.QueryError.Type; import java.util.Objects; import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java new file mode 100644 index 000000000000..d3f0fb953ae2 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java @@ -0,0 +1,57 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import io.confluent.ksql.query.QueryError.Type; +import java.util.Objects; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; +import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code TransactionAuthorizationClassifier} classifies missing cluster ACLs as user error + */ +public class TransactionAuthorizationClassifier implements QueryErrorClassifier { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionAuthorizationClassifier.class); + + private final String queryId; + + public TransactionAuthorizationClassifier(final String queryId) { + this.queryId = Objects.requireNonNull(queryId, "queryId"); + } + + @Override + public Type classify(final Throwable e) { + final Type type = + e instanceof StreamsException + && e.getCause() instanceof TransactionalIdAuthorizationException + ? Type.USER + : Type.UNKNOWN; + + if (type == Type.USER) { + LOG.info( + "Classified error as USER error based on missing transactional ID access rights. Query ID: {} Exception: {}", + queryId, + e); + } + + return type; + } + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java index f2e3f6fac62a..6957e145dba7 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java @@ -84,7 +84,9 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.MissingSourceTopicException; +import org.apache.kafka.streams.errors.StreamsException; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -288,7 +290,7 @@ public void shouldClassifyMissingSourceTopicExceptionAsUserError() { // Then: assertQueryFailsWithUserError( String.format( - "CREATE STREAM %s WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM %s;", + "CREATE STREAM %s AS SELECT * FROM %s;", outputTopic, INPUT_STREAM ), @@ -340,7 +342,7 @@ public void shouldClassifyTopicAuthorizationExceptionAsUserError() { // Then: assertQueryFailsWithUserError( String.format( - "CREATE STREAM %s WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM %s;", + "CREATE STREAM %s AS SELECT * FROM %s;", outputTopic, INPUT_STREAM ), @@ -384,7 +386,7 @@ public void shouldClassifyGroupAuthorizationExceptionAsUserError() { // Then: assertQueryFailsWithUserError( String.format( - "CREATE STREAM %s WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM %s;", + "CREATE STREAM %s AS SELECT * FROM %s;", outputTopic, INPUT_STREAM ), @@ -396,6 +398,52 @@ public void shouldClassifyGroupAuthorizationExceptionAsUserError() { ); } + @Test + public void shouldClassifyTransactionIdAuthorizationExceptionAsUserError() { + // Given: + final String serviceId = "my-service-id_"; // Defaults to "default_" + final String prefix = "_confluent-ksql-" + serviceId; + + final Map ksqlConfig = getKsqlConfig(NORMAL_USER); + ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); + ksqlConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); + + givenAllowAcl(NORMAL_USER, + resource(CLUSTER, "kafka-cluster"), + ops(DESCRIBE_CONFIGS)); + + givenAllowAcl(NORMAL_USER, + resource(TOPIC, INPUT_TOPIC), + ops(READ)); + + givenAllowAcl(NORMAL_USER, + resource(TOPIC, outputTopic), + ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); + + givenAllowAcl(NORMAL_USER, + prefixedResource(TOPIC, prefix), + ops(ALL)); + + givenAllowAcl(NORMAL_USER, + prefixedResource(GROUP, prefix), + ops(ALL)); + + givenTestSetupWithConfig(ksqlConfig); + + // Then: + assertQueryFailsWithUserError( + String.format( + "CREATE STREAM %s AS SELECT * FROM %s;", + outputTopic, + INPUT_STREAM + ), + String.format( + "%s: Error encountered trying to initialize transactions [stream-thread [Time-limited test]]", + StreamsException.class.getName() + ) + ); + } + // Requires correctly configured schema-registry running //@Test @SuppressWarnings("unused") @@ -442,14 +490,19 @@ private void givenTestSetupWithConfig(final Map ksqlConfigs) { } private void assertCanRunSimpleKsqlQuery() { - assertCanRunKsqlQuery("CREATE STREAM %s AS SELECT * FROM %s;", - outputTopic, INPUT_STREAM); + assertCanRunKsqlQuery( + "CREATE STREAM %s AS SELECT * FROM %s;", + outputTopic, + INPUT_STREAM + ); } private void assertCanRunRepartitioningKsqlQuery() { - assertCanRunKsqlQuery("CREATE TABLE %s AS SELECT itemid, count(*) " - + "FROM %s WINDOW TUMBLING (size 5 second) GROUP BY itemid;", - outputTopic, INPUT_STREAM); + assertCanRunKsqlQuery( + "CREATE TABLE %s AS SELECT itemid, count(*) FROM %s GROUP BY itemid;", + outputTopic, + INPUT_STREAM + ); } private void assertCanAccessClusterConfig(final String resourcePrefix) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java index e542a1967f47..6996cde9e9e0 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java @@ -31,7 +31,7 @@ public class GroupAuthorizationClassifierTest { @Test public void shouldClassifyTopicAuthorizationExceptionAsUserError() { // Given: - final Exception e = new StreamsException(new GroupAuthorizationException("foo")); + final Exception e = new GroupAuthorizationException("foo"); // When: final Type type = new GroupAuthorizationClassifier("").classify(e); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java index f8a6590fd9e4..8644d1907210 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java @@ -32,7 +32,7 @@ public class TopicAuthorizationClassifierTest { @Test public void shouldClassifyTopicAuthorizationExceptionAsUserError() { // Given: - final Exception e = new StreamsException(new TopicAuthorizationException("foo")); + final Exception e = new TopicAuthorizationException("foo"); // When: final Type type = new TopicAuthorizationClassifier("").classify(e); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TransactionAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TransactionAuthorizationClassifierTest.java new file mode 100644 index 000000000000..584962b3d0a6 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TransactionAuthorizationClassifierTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import io.confluent.ksql.query.QueryError.Type; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; +import org.apache.kafka.streams.errors.StreamsException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TransactionAuthorizationClassifierTest { + + @Test + public void shouldClassifyTransactionalIdAuthorizationExceptionAsUserError() { + // Given: + final Exception e = new StreamsException(new TransactionalIdAuthorizationException("foo")); + + // When: + final Type type = new TransactionAuthorizationClassifier("").classify(e); + + // Then: + assertThat(type, is(Type.USER)); + } + + @Test + public void shouldClassifyNoTransactionalIdAuthorizationExceptionAsUnknownError() { + // Given: + final Exception e = new Exception("foo"); + + // When: + final Type type = new TransactionAuthorizationClassifier("").classify(e); + + // Then: + assertThat(type, is(Type.UNKNOWN)); + } + +} \ No newline at end of file From 08b1f72f584de54325230749dbd5aa9ca665c605 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 23 Feb 2021 15:40:12 -0800 Subject: [PATCH 07/11] fix checkstyle --- .../confluent/ksql/query/GroupAuthorizationClassifier.java | 3 ++- .../confluent/ksql/query/TopicAuthorizationClassifier.java | 3 ++- .../ksql/query/TransactionAuthorizationClassifier.java | 7 ++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java index a60d453fe3ba..43bc7a34b6a0 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java @@ -40,7 +40,8 @@ public Type classify(final Throwable e) { if (type == Type.USER) { LOG.info( - "Classified error as USER error based on missing consumer groups access rights. Query ID: {} Exception: {}", + "Classified error as USER error based on missing consumer groups access rights." + + " Query ID: {} Exception: {}", queryId, e); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java index 3a09a6589365..22f3d8e2b8ac 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java @@ -40,7 +40,8 @@ public Type classify(final Throwable e) { if (type == Type.USER) { LOG.info( - "Classified error as USER error based on missing topic access rights. Query ID: {} Exception: {}", + "Classified error as USER error based on missing topic access rights." + + " Query ID: {} Exception: {}", queryId, e); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java index d3f0fb953ae2..a42e0966c71f 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java @@ -17,7 +17,6 @@ import io.confluent.ksql.query.QueryError.Type; import java.util.Objects; -import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; @@ -28,7 +27,8 @@ */ public class TransactionAuthorizationClassifier implements QueryErrorClassifier { - private static final Logger LOG = LoggerFactory.getLogger(TransactionAuthorizationClassifier.class); + private static final Logger LOG = + LoggerFactory.getLogger(TransactionAuthorizationClassifier.class); private final String queryId; @@ -46,7 +46,8 @@ public Type classify(final Throwable e) { if (type == Type.USER) { LOG.info( - "Classified error as USER error based on missing transactional ID access rights. Query ID: {} Exception: {}", + "Classified error as USER error based on missing transactional ID access rights." + + " Query ID: {} Exception: {}", queryId, e); } From d0fdb9b6380812decd8c7b8d3c93dea3128b4df4 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 23 Feb 2021 16:07:10 -0800 Subject: [PATCH 08/11] Update ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java Co-authored-by: Victoria Xia --- .../ksql/query/GroupAuthorizationClassifierTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java index 6996cde9e9e0..9fe57a8418ff 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java @@ -29,7 +29,7 @@ public class GroupAuthorizationClassifierTest { @Test - public void shouldClassifyTopicAuthorizationExceptionAsUserError() { + public void shouldClassifyGroupAuthorizationExceptionAsUserError() { // Given: final Exception e = new GroupAuthorizationException("foo"); @@ -52,4 +52,4 @@ public void shouldClassifyNoGroupAuthorizationExceptionAsUnknownError() { assertThat(type, is(Type.UNKNOWN)); } -} \ No newline at end of file +} From a065ecf21fc34592444e9e3ce9378bd8419abe3e Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 23 Feb 2021 17:36:57 -0800 Subject: [PATCH 09/11] Github comments --- .../integration/SecureIntegrationTest.java | 174 ++++++------------ 1 file changed, 52 insertions(+), 122 deletions(-) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java index 6957e145dba7..b72914fee879 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java @@ -109,6 +109,9 @@ public class SecureIntegrationTest { private static final Credentials SUPER_USER = VALID_USER1; private static final Credentials NORMAL_USER = VALID_USER2; private static final AtomicInteger COUNTER = new AtomicInteger(0); + private final String SERVICE_ID = "my-service-id_"; + private final String QUERY_ID_PREFIX = "_confluent-ksql-" + SERVICE_ID; + public static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness .builder() @@ -221,71 +224,36 @@ public void shouldRunQueryAgainstKafkaClusterOverSaslSsl() { @Test public void shouldWorkWithMinimalPrefixedAcls() { // Given: - final String serviceId = "my-service-id_"; // Defaults to "default_" - final String prefix = "_confluent-ksql-" + serviceId; - final Map ksqlConfig = getKsqlConfig(NORMAL_USER); - ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); - - givenAllowAcl(NORMAL_USER, - resource(CLUSTER, "kafka-cluster"), - ops(DESCRIBE_CONFIGS)); - - givenAllowAcl(NORMAL_USER, - resource(TOPIC, INPUT_TOPIC), - ops(READ)); - - givenAllowAcl(NORMAL_USER, - resource(TOPIC, outputTopic), - ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); - - givenAllowAcl(NORMAL_USER, - prefixedResource(TOPIC, prefix), - ops(ALL)); + ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID); + givenTestSetupWithAclsForQuery(); givenAllowAcl(NORMAL_USER, - prefixedResource(GROUP, prefix), - ops(ALL)); - + resource(CLUSTER, "kafka-cluster"), + ops(DESCRIBE_CONFIGS)); givenTestSetupWithConfig(ksqlConfig); // Then: assertCanRunRepartitioningKsqlQuery(); - assertCanAccessClusterConfig(prefix); + assertCanAccessClusterConfig(); } @Test public void shouldClassifyMissingSourceTopicExceptionAsUserError() { // Given: - final String serviceId = "my-service-id_"; // Defaults to "default_" - final String prefix = "_confluent-ksql-" + serviceId; - final Map ksqlConfig = getKsqlConfig(NORMAL_USER); - ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); - - givenAllowAcl(NORMAL_USER, - resource(CLUSTER, "kafka-cluster"), - ops(DESCRIBE_CONFIGS)); - - givenAllowAcl(NORMAL_USER, - resource(TOPIC, INPUT_TOPIC), - ops(READ)); - - givenAllowAcl(NORMAL_USER, - resource(TOPIC, outputTopic), - ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); - - givenAllowAcl(NORMAL_USER, - prefixedResource(TOPIC, prefix), - ops(ALL)); - - givenAllowAcl(NORMAL_USER, - prefixedResource(GROUP, prefix), - ops(ALL)); + ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID); + givenTestSetupWithAclsForQuery(); givenTestSetupWithConfig(ksqlConfig); + // When: topicClient.deleteTopics(Collections.singleton(INPUT_TOPIC)); + assertThatEventually( + "Wait for async topic deleting", + () -> topicClient.isTopicExists(outputTopic), + is(false) + ); // Then: assertQueryFailsWithUserError( @@ -304,34 +272,13 @@ public void shouldClassifyMissingSourceTopicExceptionAsUserError() { @Test public void shouldClassifyTopicAuthorizationExceptionAsUserError() { // Given: - final String serviceId = "my-service-id_"; // Defaults to "default_" - final String prefix = "_confluent-ksql-" + serviceId; - final Map ksqlConfig = getKsqlConfig(NORMAL_USER); - ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); - - givenAllowAcl(NORMAL_USER, - resource(CLUSTER, "kafka-cluster"), - ops(DESCRIBE_CONFIGS)); - - givenAllowAcl(NORMAL_USER, - resource(TOPIC, INPUT_TOPIC), - ops(READ)); - - givenAllowAcl(NORMAL_USER, - prefixedResource(TOPIC, prefix), - ops(ALL)); - - givenAllowAcl(NORMAL_USER, - resource(TOPIC, outputTopic), - ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); - - givenAllowAcl(NORMAL_USER, - prefixedResource(GROUP, prefix), - ops(ALL)); + ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID); + givenTestSetupWithAclsForQuery(); givenTestSetupWithConfig(ksqlConfig); + // When: TEST_HARNESS.getKafkaCluster().addUserAcl( NORMAL_USER.username, AclPermissionType.DENY, @@ -357,32 +304,19 @@ public void shouldClassifyTopicAuthorizationExceptionAsUserError() { @Test public void shouldClassifyGroupAuthorizationExceptionAsUserError() { // Given: - final String serviceId = "my-service-id_"; // Defaults to "default_" - final String prefix = "_confluent-ksql-" + serviceId; - final Map ksqlConfig = getKsqlConfig(NORMAL_USER); - ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); - - givenAllowAcl(NORMAL_USER, - resource(CLUSTER, "kafka-cluster"), - ops(DESCRIBE_CONFIGS)); - - givenAllowAcl(NORMAL_USER, - resource(TOPIC, INPUT_TOPIC), - ops(READ)); - - givenAllowAcl(NORMAL_USER, - resource(TOPIC, outputTopic), - ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); - - givenAllowAcl(NORMAL_USER, - prefixedResource(TOPIC, prefix), - ops(ALL)); - - // Skip setting consumer group permissions + ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID); + givenTestSetupWithAclsForQuery(); givenTestSetupWithConfig(ksqlConfig); + TEST_HARNESS.getKafkaCluster().addUserAcl( + NORMAL_USER.username, + AclPermissionType.DENY, + prefixedResource(GROUP, QUERY_ID_PREFIX), + ops(ALL) + ); + // Then: assertQueryFailsWithUserError( String.format( @@ -393,7 +327,7 @@ public void shouldClassifyGroupAuthorizationExceptionAsUserError() { String.format( "%s: Not authorized to access group: %squery_", GroupAuthorizationException.class.getName(), - prefix + QUERY_ID_PREFIX ) + "%s" ); } @@ -401,33 +335,11 @@ public void shouldClassifyGroupAuthorizationExceptionAsUserError() { @Test public void shouldClassifyTransactionIdAuthorizationExceptionAsUserError() { // Given: - final String serviceId = "my-service-id_"; // Defaults to "default_" - final String prefix = "_confluent-ksql-" + serviceId; - final Map ksqlConfig = getKsqlConfig(NORMAL_USER); - ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); + ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID); ksqlConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); - givenAllowAcl(NORMAL_USER, - resource(CLUSTER, "kafka-cluster"), - ops(DESCRIBE_CONFIGS)); - - givenAllowAcl(NORMAL_USER, - resource(TOPIC, INPUT_TOPIC), - ops(READ)); - - givenAllowAcl(NORMAL_USER, - resource(TOPIC, outputTopic), - ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); - - givenAllowAcl(NORMAL_USER, - prefixedResource(TOPIC, prefix), - ops(ALL)); - - givenAllowAcl(NORMAL_USER, - prefixedResource(GROUP, prefix), - ops(ALL)); - + givenTestSetupWithAclsForQuery(); // does not authorize TX, but we enabled EOS above givenTestSetupWithConfig(ksqlConfig); // Then: @@ -469,6 +381,24 @@ public void shouldRunQueryAgainstSecureSchemaRegistry() { } } + private void givenTestSetupWithAclsForQuery() { + givenAllowAcl(NORMAL_USER, + resource(TOPIC, INPUT_TOPIC), + ops(READ)); + + givenAllowAcl(NORMAL_USER, + resource(TOPIC, outputTopic), + ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); + + givenAllowAcl(NORMAL_USER, + prefixedResource(TOPIC, QUERY_ID_PREFIX), + ops(ALL)); + + givenAllowAcl(NORMAL_USER, + prefixedResource(GROUP, QUERY_ID_PREFIX), + ops(ALL)); + } + private static void givenAllowAcl(final Credentials credentials, final ResourcePattern resource, final Set ops) { @@ -505,11 +435,11 @@ private void assertCanRunRepartitioningKsqlQuery() { ); } - private void assertCanAccessClusterConfig(final String resourcePrefix) { + private void assertCanAccessClusterConfig() { // Creating topic with default replicas causes topic client to query cluster config to get // default replica count: serviceContext.getTopicClient() - .createTopic(resourcePrefix + "-foo", 1, TopicProperties.DEFAULT_REPLICAS); + .createTopic(QUERY_ID_PREFIX + "-foo", 1, TopicProperties.DEFAULT_REPLICAS); } private void assertCanRunKsqlQuery( @@ -536,7 +466,7 @@ private void assertQueryFailsWithUserError( queryMetadata.start(); assertThatEventually( - "", + "Wait for query to fail", () -> queryMetadata.getQueryErrors().size() > 0, is(true) ); From 1bfee24a02f398610aa3f4f865593f61ac290df2 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 23 Feb 2021 19:25:07 -0800 Subject: [PATCH 10/11] Fix findbugs --- .../io/confluent/ksql/integration/SecureIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java index b72914fee879..2c98d769b0d1 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java @@ -109,8 +109,8 @@ public class SecureIntegrationTest { private static final Credentials SUPER_USER = VALID_USER1; private static final Credentials NORMAL_USER = VALID_USER2; private static final AtomicInteger COUNTER = new AtomicInteger(0); - private final String SERVICE_ID = "my-service-id_"; - private final String QUERY_ID_PREFIX = "_confluent-ksql-" + SERVICE_ID; + private static final String SERVICE_ID = "my-service-id_"; + private static final String QUERY_ID_PREFIX = "_confluent-ksql-" + SERVICE_ID; public static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness From 7acea0326056f90f50c7d33a645b502567ed05e1 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 24 Feb 2021 10:53:25 -0800 Subject: [PATCH 11/11] Unify error classification based on super-type --- ...fier.java => AuthorizationClassifier.java} | 17 +++--- .../query/GroupAuthorizationClassifier.java | 52 ----------------- .../confluent/ksql/query/QueryExecutor.java | 4 +- .../query/TopicAuthorizationClassifier.java | 52 ----------------- .../integration/SecureIntegrationTest.java | 1 - ....java => AuthorizationClassifierTest.java} | 35 ++++++++++-- .../GroupAuthorizationClassifierTest.java | 55 ------------------ .../TopicAuthorizationClassifierTest.java | 56 ------------------- 8 files changed, 40 insertions(+), 232 deletions(-) rename ksqldb-engine/src/main/java/io/confluent/ksql/query/{TransactionAuthorizationClassifier.java => AuthorizationClassifier.java} (67%) delete mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java delete mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java rename ksqldb-engine/src/test/java/io/confluent/ksql/query/{TransactionAuthorizationClassifierTest.java => AuthorizationClassifierTest.java} (59%) delete mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java delete mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/AuthorizationClassifier.java similarity index 67% rename from ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java rename to ksqldb-engine/src/main/java/io/confluent/ksql/query/AuthorizationClassifier.java index a42e0966c71f..6180c33d674e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransactionAuthorizationClassifier.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/AuthorizationClassifier.java @@ -17,36 +17,37 @@ import io.confluent.ksql.query.QueryError.Type; import java.util.Objects; -import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; +import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * {@code TransactionAuthorizationClassifier} classifies missing cluster ACLs as user error + * {@code AuthorizationClassifier} classifies missing ACLs as user error */ -public class TransactionAuthorizationClassifier implements QueryErrorClassifier { +public class AuthorizationClassifier implements QueryErrorClassifier { private static final Logger LOG = - LoggerFactory.getLogger(TransactionAuthorizationClassifier.class); + LoggerFactory.getLogger(AuthorizationClassifier.class); private final String queryId; - public TransactionAuthorizationClassifier(final String queryId) { + public AuthorizationClassifier(final String queryId) { this.queryId = Objects.requireNonNull(queryId, "queryId"); } @Override public Type classify(final Throwable e) { final Type type = - e instanceof StreamsException - && e.getCause() instanceof TransactionalIdAuthorizationException + e instanceof AuthorizationException + || e instanceof StreamsException + && e.getCause() instanceof AuthorizationException ? Type.USER : Type.UNKNOWN; if (type == Type.USER) { LOG.info( - "Classified error as USER error based on missing transactional ID access rights." + "Classified error as USER error based on missing access rights." + " Query ID: {} Exception: {}", queryId, e); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java deleted file mode 100644 index 43bc7a34b6a0..000000000000 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/GroupAuthorizationClassifier.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2021 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"; you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.query; - -import io.confluent.ksql.query.QueryError.Type; -import java.util.Objects; -import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@code GroupAuthorizationClassifier} classifies missing consumer group ACLs as user error - */ -public class GroupAuthorizationClassifier implements QueryErrorClassifier { - - private static final Logger LOG = LoggerFactory.getLogger(GroupAuthorizationClassifier.class); - - private final String queryId; - - public GroupAuthorizationClassifier(final String queryId) { - this.queryId = Objects.requireNonNull(queryId, "queryId"); - } - - @Override - public Type classify(final Throwable e) { - final Type type = e instanceof GroupAuthorizationException ? Type.USER : Type.UNKNOWN; - - if (type == Type.USER) { - LOG.info( - "Classified error as USER error based on missing consumer groups access rights." - + " Query ID: {} Exception: {}", - queryId, - e); - } - - return type; - } - -} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 5c484d8ab706..28ff2aa309ea 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -226,9 +226,7 @@ public PersistentQueryMetadata buildPersistentQuery( )); final QueryErrorClassifier userErrorClassifiers = new MissingTopicClassifier(applicationId) - .and(new TopicAuthorizationClassifier(applicationId)) - .and(new GroupAuthorizationClassifier(applicationId)) - .and(new TransactionAuthorizationClassifier(applicationId)); + .and(new AuthorizationClassifier(applicationId)); final QueryErrorClassifier classifier = buildConfiguredClassifiers(ksqlConfig, applicationId) .map(userErrorClassifiers::and) .orElse(userErrorClassifiers); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java deleted file mode 100644 index 22f3d8e2b8ac..000000000000 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TopicAuthorizationClassifier.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2021 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"; you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.query; - -import io.confluent.ksql.query.QueryError.Type; -import java.util.Objects; -import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@code TopicAuthorizationClassifier} classifies topic ACL exceptions as user error - */ -public class TopicAuthorizationClassifier implements QueryErrorClassifier { - - private static final Logger LOG = LoggerFactory.getLogger(TopicAuthorizationClassifier.class); - - private final String queryId; - - public TopicAuthorizationClassifier(final String queryId) { - this.queryId = Objects.requireNonNull(queryId, "queryId"); - } - - @Override - public Type classify(final Throwable e) { - final Type type = e instanceof TopicAuthorizationException ? Type.USER : Type.UNKNOWN; - - if (type == Type.USER) { - LOG.info( - "Classified error as USER error based on missing topic access rights." - + " Query ID: {} Exception: {}", - queryId, - e); - } - - return type; - } - -} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java index 2c98d769b0d1..cb40459c5011 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java @@ -112,7 +112,6 @@ public class SecureIntegrationTest { private static final String SERVICE_ID = "my-service-id_"; private static final String QUERY_ID_PREFIX = "_confluent-ksql-" + SERVICE_ID; - public static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness .builder() .withKafkaCluster( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TransactionAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/AuthorizationClassifierTest.java similarity index 59% rename from ksqldb-engine/src/test/java/io/confluent/ksql/query/TransactionAuthorizationClassifierTest.java rename to ksqldb-engine/src/test/java/io/confluent/ksql/query/AuthorizationClassifierTest.java index 584962b3d0a6..80e355dafe0f 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TransactionAuthorizationClassifierTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/AuthorizationClassifierTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.is; import io.confluent.ksql.query.QueryError.Type; +import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.streams.errors.StreamsException; @@ -27,15 +28,39 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) -public class TransactionAuthorizationClassifierTest { +public class AuthorizationClassifierTest { @Test - public void shouldClassifyTransactionalIdAuthorizationExceptionAsUserError() { + public void shouldClassifyWrappedAuthorizationExceptionAsUserError() { // Given: - final Exception e = new StreamsException(new TransactionalIdAuthorizationException("foo")); + final Exception e = new StreamsException(new AuthorizationException("foo")); // When: - final Type type = new TransactionAuthorizationClassifier("").classify(e); + final Type type = new AuthorizationClassifier("").classify(e); + + // Then: + assertThat(type, is(Type.USER)); + } + + @Test + public void shouldClassifyAuthorizationExceptionAsUserError() { + // Given: + final Exception e = new AuthorizationException("foo"); + + // When: + final Type type = new AuthorizationClassifier("").classify(e); + + // Then: + assertThat(type, is(Type.USER)); + } + + @Test + public void shouldClassifySubTypeOfAuthorizationExceptionAsUserError() { + // Given: + final Exception e = new TopicAuthorizationException("foo"); + + // When: + final Type type = new AuthorizationClassifier("").classify(e); // Then: assertThat(type, is(Type.USER)); @@ -47,7 +72,7 @@ public void shouldClassifyNoTransactionalIdAuthorizationExceptionAsUnknownError( final Exception e = new Exception("foo"); // When: - final Type type = new TransactionAuthorizationClassifier("").classify(e); + final Type type = new AuthorizationClassifier("").classify(e); // Then: assertThat(type, is(Type.UNKNOWN)); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java deleted file mode 100644 index 9fe57a8418ff..000000000000 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAuthorizationClassifierTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2021 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"; you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.query; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; - -import io.confluent.ksql.query.QueryError.Type; -import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.apache.kafka.streams.errors.StreamsException; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class GroupAuthorizationClassifierTest { - - @Test - public void shouldClassifyGroupAuthorizationExceptionAsUserError() { - // Given: - final Exception e = new GroupAuthorizationException("foo"); - - // When: - final Type type = new GroupAuthorizationClassifier("").classify(e); - - // Then: - assertThat(type, is(Type.USER)); - } - - @Test - public void shouldClassifyNoGroupAuthorizationExceptionAsUnknownError() { - // Given: - final Exception e = new Exception("foo"); - - // When: - final Type type = new GroupAuthorizationClassifier("").classify(e); - - // Then: - assertThat(type, is(Type.UNKNOWN)); - } - -} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java deleted file mode 100644 index 8644d1907210..000000000000 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/TopicAuthorizationClassifierTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2021 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"; you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.query; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; - -import io.confluent.ksql.query.QueryError.Type; -import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.streams.errors.MissingSourceTopicException; -import org.apache.kafka.streams.errors.StreamsException; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class TopicAuthorizationClassifierTest { - - @Test - public void shouldClassifyTopicAuthorizationExceptionAsUserError() { - // Given: - final Exception e = new TopicAuthorizationException("foo"); - - // When: - final Type type = new TopicAuthorizationClassifier("").classify(e); - - // Then: - assertThat(type, is(Type.USER)); - } - - @Test - public void shouldClassifyNoTopicAuthorizationExceptionAsUnknownError() { - // Given: - final Exception e = new Exception("foo"); - - // When: - final Type type = new TopicAuthorizationClassifier("").classify(e); - - // Then: - assertThat(type, is(Type.UNKNOWN)); - } - -} \ No newline at end of file