-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: classify authorization exception as user error #7061
Merged
mjsax
merged 11 commits into
confluentinc:master
from
mjsax:kse-24-update-error-classification
Feb 25, 2021
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
bf8f3c6
feat: classify authorization exception as user error
mjsax 3bf546f
Github comments
mjsax b27225a
Add handling for GroupAuthorizationException
mjsax 4adaf97
Minor cleanup
mjsax 4c801e0
Clarify test setup
mjsax 2cb2e1a
Add tx authorization handling
mjsax 08b1f72
fix checkstyle
mjsax d0fdb9b
Update ksqldb-engine/src/test/java/io/confluent/ksql/query/GroupAutho…
mjsax a065ecf
Github comments
mjsax 1bfee24
Fix findbugs
mjsax 7acea03
Unify error classification based on super-type
mjsax File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
59 changes: 59 additions & 0 deletions
59
ksqldb-engine/src/main/java/io/confluent/ksql/query/AuthorizationClassifier.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright 2021 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"; you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.query; | ||
|
||
import io.confluent.ksql.query.QueryError.Type; | ||
import java.util.Objects; | ||
import org.apache.kafka.common.errors.AuthorizationException; | ||
import org.apache.kafka.streams.errors.StreamsException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* {@code AuthorizationClassifier} classifies missing ACLs as user error | ||
*/ | ||
public class AuthorizationClassifier implements QueryErrorClassifier { | ||
|
||
private static final Logger LOG = | ||
LoggerFactory.getLogger(AuthorizationClassifier.class); | ||
|
||
private final String queryId; | ||
|
||
public AuthorizationClassifier(final String queryId) { | ||
this.queryId = Objects.requireNonNull(queryId, "queryId"); | ||
} | ||
|
||
@Override | ||
public Type classify(final Throwable e) { | ||
final Type type = | ||
e instanceof AuthorizationException | ||
|| e instanceof StreamsException | ||
&& e.getCause() instanceof AuthorizationException | ||
? Type.USER | ||
: Type.UNKNOWN; | ||
|
||
if (type == Type.USER) { | ||
LOG.info( | ||
"Classified error as USER error based on missing access rights." | ||
+ " Query ID: {} Exception: {}", | ||
queryId, | ||
e); | ||
} | ||
|
||
return type; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,8 @@ | |
import static org.apache.kafka.common.resource.ResourceType.CLUSTER; | ||
import static org.apache.kafka.common.resource.ResourceType.GROUP; | ||
import static org.apache.kafka.common.resource.ResourceType.TOPIC; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.contains; | ||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
|
@@ -44,6 +46,8 @@ | |
import io.confluent.ksql.engine.KsqlEngineTestUtil; | ||
import io.confluent.ksql.function.InternalFunctionRegistry; | ||
import io.confluent.ksql.logging.processing.ProcessingLogContext; | ||
import io.confluent.ksql.query.QueryError; | ||
import io.confluent.ksql.query.QueryError.Type; | ||
import io.confluent.ksql.query.QueryId; | ||
import io.confluent.ksql.query.id.SequentialQueryIdGenerator; | ||
import io.confluent.ksql.services.DisabledKsqlClient; | ||
|
@@ -60,6 +64,8 @@ | |
import io.confluent.ksql.util.OrderDataProvider; | ||
import io.confluent.ksql.util.PersistentQueryMetadata; | ||
import io.confluent.ksql.util.QueryMetadata; | ||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
@@ -74,8 +80,13 @@ | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.common.acl.AclOperation; | ||
import org.apache.kafka.common.acl.AclPermissionType; | ||
import org.apache.kafka.common.errors.GroupAuthorizationException; | ||
import org.apache.kafka.common.errors.TopicAuthorizationException; | ||
import org.apache.kafka.common.resource.ResourcePattern; | ||
import org.apache.kafka.common.security.auth.SecurityProtocol; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.errors.MissingSourceTopicException; | ||
import org.apache.kafka.streams.errors.StreamsException; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.junit.ClassRule; | ||
|
@@ -98,6 +109,8 @@ public class SecureIntegrationTest { | |
private static final Credentials SUPER_USER = VALID_USER1; | ||
private static final Credentials NORMAL_USER = VALID_USER2; | ||
private static final AtomicInteger COUNTER = new AtomicInteger(0); | ||
private static final String SERVICE_ID = "my-service-id_"; | ||
private static final String QUERY_ID_PREFIX = "_confluent-ksql-" + SERVICE_ID; | ||
|
||
public static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness | ||
.builder() | ||
|
@@ -210,37 +223,136 @@ public void shouldRunQueryAgainstKafkaClusterOverSaslSsl() { | |
@Test | ||
public void shouldWorkWithMinimalPrefixedAcls() { | ||
// Given: | ||
final String serviceId = "my-service-id_"; // Defaults to "default_" | ||
final String prefix = "_confluent-ksql-" + serviceId; | ||
|
||
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER); | ||
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); | ||
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID); | ||
|
||
givenTestSetupWithAclsForQuery(); | ||
givenAllowAcl(NORMAL_USER, | ||
resource(CLUSTER, "kafka-cluster"), | ||
ops(DESCRIBE_CONFIGS)); | ||
resource(CLUSTER, "kafka-cluster"), | ||
ops(DESCRIBE_CONFIGS)); | ||
givenTestSetupWithConfig(ksqlConfig); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(TOPIC, INPUT_TOPIC), | ||
ops(READ)); | ||
// Then: | ||
assertCanRunRepartitioningKsqlQuery(); | ||
assertCanAccessClusterConfig(); | ||
} | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(TOPIC, outputTopic), | ||
ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); | ||
@Test | ||
public void shouldClassifyMissingSourceTopicExceptionAsUserError() { | ||
// Given: | ||
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER); | ||
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
prefixedResource(TOPIC, prefix), | ||
ops(ALL)); | ||
givenTestSetupWithAclsForQuery(); | ||
givenTestSetupWithConfig(ksqlConfig); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
prefixedResource(GROUP, prefix), | ||
ops(ALL)); | ||
// When: | ||
topicClient.deleteTopics(Collections.singleton(INPUT_TOPIC)); | ||
assertThatEventually( | ||
"Wait for async topic deleting", | ||
() -> topicClient.isTopicExists(outputTopic), | ||
is(false) | ||
); | ||
|
||
// Then: | ||
assertQueryFailsWithUserError( | ||
String.format( | ||
"CREATE STREAM %s AS SELECT * FROM %s;", | ||
outputTopic, | ||
INPUT_STREAM | ||
), | ||
String.format( | ||
"%s: One or more source topics were missing during rebalance", | ||
MissingSourceTopicException.class.getName() | ||
) | ||
); | ||
} | ||
|
||
@Test | ||
public void shouldClassifyTopicAuthorizationExceptionAsUserError() { | ||
// Given: | ||
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER); | ||
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID); | ||
|
||
givenTestSetupWithAclsForQuery(); | ||
givenTestSetupWithConfig(ksqlConfig); | ||
|
||
// When: | ||
TEST_HARNESS.getKafkaCluster().addUserAcl( | ||
NORMAL_USER.username, | ||
AclPermissionType.DENY, | ||
resource(TOPIC, INPUT_TOPIC), | ||
ops(READ) | ||
); | ||
|
||
// Then: | ||
assertCanRunRepartitioningKsqlQuery(); | ||
assertCanAccessClusterConfig(prefix); | ||
assertQueryFailsWithUserError( | ||
String.format( | ||
"CREATE STREAM %s AS SELECT * FROM %s;", | ||
outputTopic, | ||
INPUT_STREAM | ||
), | ||
String.format( | ||
"%s: Not authorized to access topics: [%s]", | ||
TopicAuthorizationException.class.getName(), | ||
INPUT_TOPIC | ||
) | ||
); | ||
} | ||
|
||
@Test | ||
public void shouldClassifyGroupAuthorizationExceptionAsUserError() { | ||
// Given: | ||
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER); | ||
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID); | ||
|
||
givenTestSetupWithAclsForQuery(); | ||
givenTestSetupWithConfig(ksqlConfig); | ||
|
||
TEST_HARNESS.getKafkaCluster().addUserAcl( | ||
NORMAL_USER.username, | ||
AclPermissionType.DENY, | ||
prefixedResource(GROUP, QUERY_ID_PREFIX), | ||
ops(ALL) | ||
); | ||
|
||
// Then: | ||
assertQueryFailsWithUserError( | ||
String.format( | ||
"CREATE STREAM %s AS SELECT * FROM %s;", | ||
outputTopic, | ||
INPUT_STREAM | ||
), | ||
String.format( | ||
"%s: Not authorized to access group: %squery_", | ||
GroupAuthorizationException.class.getName(), | ||
QUERY_ID_PREFIX | ||
) + "%s" | ||
); | ||
} | ||
|
||
@Test | ||
public void shouldClassifyTransactionIdAuthorizationExceptionAsUserError() { | ||
// Given: | ||
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER); | ||
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, SERVICE_ID); | ||
ksqlConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this cause a transaction ID authorization exception? (Can we add a quick comment so readers know that's the purpose of setting this config in this test?) |
||
|
||
givenTestSetupWithAclsForQuery(); // does not authorize TX, but we enabled EOS above | ||
givenTestSetupWithConfig(ksqlConfig); | ||
|
||
// Then: | ||
assertQueryFailsWithUserError( | ||
String.format( | ||
"CREATE STREAM %s AS SELECT * FROM %s;", | ||
outputTopic, | ||
INPUT_STREAM | ||
), | ||
String.format( | ||
"%s: Error encountered trying to initialize transactions [stream-thread [Time-limited test]]", | ||
StreamsException.class.getName() | ||
) | ||
); | ||
} | ||
|
||
// Requires correctly configured schema-registry running | ||
|
@@ -268,6 +380,24 @@ public void shouldRunQueryAgainstSecureSchemaRegistry() { | |
} | ||
} | ||
|
||
private void givenTestSetupWithAclsForQuery() { | ||
givenAllowAcl(NORMAL_USER, | ||
resource(TOPIC, INPUT_TOPIC), | ||
ops(READ)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(TOPIC, outputTopic), | ||
ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
prefixedResource(TOPIC, QUERY_ID_PREFIX), | ||
ops(ALL)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
prefixedResource(GROUP, QUERY_ID_PREFIX), | ||
ops(ALL)); | ||
} | ||
|
||
private static void givenAllowAcl(final Credentials credentials, | ||
final ResourcePattern resource, | ||
final Set<AclOperation> ops) { | ||
|
@@ -289,21 +419,26 @@ private void givenTestSetupWithConfig(final Map<String, Object> ksqlConfigs) { | |
} | ||
|
||
private void assertCanRunSimpleKsqlQuery() { | ||
assertCanRunKsqlQuery("CREATE STREAM %s AS SELECT * FROM %s;", | ||
outputTopic, INPUT_STREAM); | ||
assertCanRunKsqlQuery( | ||
"CREATE STREAM %s AS SELECT * FROM %s;", | ||
outputTopic, | ||
INPUT_STREAM | ||
); | ||
} | ||
|
||
private void assertCanRunRepartitioningKsqlQuery() { | ||
assertCanRunKsqlQuery("CREATE TABLE %s AS SELECT itemid, count(*) " | ||
+ "FROM %s WINDOW TUMBLING (size 5 second) GROUP BY itemid;", | ||
outputTopic, INPUT_STREAM); | ||
assertCanRunKsqlQuery( | ||
"CREATE TABLE %s AS SELECT itemid, count(*) FROM %s GROUP BY itemid;", | ||
outputTopic, | ||
INPUT_STREAM | ||
); | ||
} | ||
|
||
private void assertCanAccessClusterConfig(final String resourcePrefix) { | ||
private void assertCanAccessClusterConfig() { | ||
// Creating topic with default replicas causes topic client to query cluster config to get | ||
// default replica count: | ||
serviceContext.getTopicClient() | ||
.createTopic(resourcePrefix + "-foo", 1, TopicProperties.DEFAULT_REPLICAS); | ||
.createTopic(QUERY_ID_PREFIX + "-foo", 1, TopicProperties.DEFAULT_REPLICAS); | ||
} | ||
|
||
private void assertCanRunKsqlQuery( | ||
|
@@ -321,6 +456,29 @@ private void assertCanRunKsqlQuery( | |
TEST_HARNESS.verifyAvailableRecords(outputTopic, greaterThan(0)); | ||
} | ||
|
||
private void assertQueryFailsWithUserError( | ||
final String query, | ||
final String errorMsg | ||
) { | ||
final QueryMetadata queryMetadata = KsqlEngineTestUtil | ||
.execute(serviceContext, ksqlEngine, query, ksqlConfig, Collections.emptyMap()).get(0); | ||
|
||
queryMetadata.start(); | ||
assertThatEventually( | ||
"Wait for query to fail", | ||
() -> queryMetadata.getQueryErrors().size() > 0, | ||
is(true) | ||
); | ||
|
||
for (final QueryError error : queryMetadata.getQueryErrors()) { | ||
assertThat(error.getType(), is(Type.USER)); | ||
assertThat( | ||
error.getErrorMessage().split("\n")[0], | ||
is(String.format(errorMsg, queryMetadata.getQueryId())) | ||
); | ||
} | ||
} | ||
|
||
private static Map<String, Object> getBaseKsqlConfig() { | ||
final Map<String, Object> configs = new HashMap<>(KsqlConfigTestUtil.baseTestConfig()); | ||
configs.put( | ||
|
@@ -382,6 +540,6 @@ private void executePersistentQuery(final String queryString, | |
.execute(serviceContext, ksqlEngine, query, ksqlConfig, Collections.emptyMap()).get(0); | ||
|
||
queryMetadata.start(); | ||
queryId = ((PersistentQueryMetadata) queryMetadata).getQueryId(); | ||
queryId = queryMetadata.getQueryId(); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR but I'm curious: what happens if an internal (changelog or repartition) topic is missing, or if a sink topic is missing? Does Streams throw a different type of exception in these cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Atm, for changelogs and repartition topic they would be created. We would fail if the expected config does not match. (this will "change" with https://cwiki.apache.org/confluence/display/KAFKA/KIP-698%3A+Add+Explicit+User+Initialization+of+Broker-side+State+to+Kafka+Streams) -- Or course, it's only verified in a rebalance, but checking the source topic is also only done during a rebalance.
For output topics, Kafka Streams won't do anything, and thus the producer would fill up its write buffer and eventually block. To eventually Streams would crash.
But as ksqlDB checks for output topic, it should be ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is true for internal topics too right? It seems like right now for internal topics that are deleted we'd block for max.block.ms and then throw an error classified as SYSTEM. Then, retry and recreate. After KIP-698 we'd block and then throw an error we will classify as USER on every retry.
Where do we check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Not sure how the error would be classified atm or after KIP-698. But it might be out-of-scope for this PR. Would like to focus on authorization errors for now.
Doesn't kslqDB create output topics explicitly if they don't exist?