From 6747d5cd1263facec4ee947f764b24ebe1ae9873 Mon Sep 17 00:00:00 2001
From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com>
Date: Fri, 13 Sep 2019 15:17:24 +0100
Subject: [PATCH] fix: switch AdminClient to be sandbox proxy (#3351)
Now that Kafka has an `Admin` interface we can switch our `SandboxedAdminClient` from extending the abstract `AdminClient` to being a proxy. This decouples us from most upstream changes to the API.
---
.../ksql/services/SandboxedAdminClient.java | 305 +-----------------
.../SandboxedKafkaClientSupplier.java | 2 +-
.../services/SandboxedAdminClientTest.java | 13 +-
.../SandboxedKafkaClientSupplierTest.java | 9 +-
4 files changed, 22 insertions(+), 307 deletions(-)
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedAdminClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedAdminClient.java
index abf9d91d9935..3e1f4c853c5b 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedAdminClient.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedAdminClient.java
@@ -15,78 +15,10 @@
package io.confluent.ksql.services;
-import java.time.Duration;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.AlterConfigOp;
-import org.apache.kafka.clients.admin.AlterConfigsOptions;
-import org.apache.kafka.clients.admin.AlterConfigsResult;
-import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions;
-import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
-import org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions;
-import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
-import org.apache.kafka.clients.admin.Config;
-import org.apache.kafka.clients.admin.CreateAclsOptions;
-import org.apache.kafka.clients.admin.CreateAclsResult;
-import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
-import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
-import org.apache.kafka.clients.admin.CreatePartitionsOptions;
-import org.apache.kafka.clients.admin.CreatePartitionsResult;
-import org.apache.kafka.clients.admin.CreateTopicsOptions;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.DeleteAclsOptions;
-import org.apache.kafka.clients.admin.DeleteAclsResult;
-import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
-import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
-import org.apache.kafka.clients.admin.DeleteRecordsOptions;
-import org.apache.kafka.clients.admin.DeleteRecordsResult;
-import org.apache.kafka.clients.admin.DeleteTopicsOptions;
-import org.apache.kafka.clients.admin.DeleteTopicsResult;
-import org.apache.kafka.clients.admin.DescribeAclsOptions;
-import org.apache.kafka.clients.admin.DescribeAclsResult;
-import org.apache.kafka.clients.admin.DescribeClusterOptions;
-import org.apache.kafka.clients.admin.DescribeClusterResult;
-import org.apache.kafka.clients.admin.DescribeConfigsOptions;
-import org.apache.kafka.clients.admin.DescribeConfigsResult;
-import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
-import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
-import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
-import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
-import org.apache.kafka.clients.admin.DescribeLogDirsOptions;
-import org.apache.kafka.clients.admin.DescribeLogDirsResult;
-import org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions;
-import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
-import org.apache.kafka.clients.admin.DescribeTopicsOptions;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
-import org.apache.kafka.clients.admin.ElectLeadersOptions;
-import org.apache.kafka.clients.admin.ElectLeadersResult;
-import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
-import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
-import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
-import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
-import org.apache.kafka.clients.admin.ListPartitionReassignmentsOptions;
-import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
-import org.apache.kafka.clients.admin.ListTopicsOptions;
-import org.apache.kafka.clients.admin.ListTopicsResult;
-import org.apache.kafka.clients.admin.NewPartitionReassignment;
-import org.apache.kafka.clients.admin.NewPartitions;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.RecordsToDelete;
-import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
-import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
-import org.apache.kafka.common.ElectionType;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.TopicPartitionReplica;
-import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.acl.AclBindingFilter;
-import org.apache.kafka.common.config.ConfigResource;
+import static io.confluent.ksql.util.LimitedProxyBuilder.anyParams;
+
+import io.confluent.ksql.util.LimitedProxyBuilder;
+import org.apache.kafka.clients.admin.Admin;
/**
* An admin client to use while trying out operations.
@@ -96,231 +28,14 @@
*
Most operations result in a {@code UnsupportedOperationException} being thrown as they are
* not called.
*/
-class SandboxedAdminClient extends AdminClient {
-
- SandboxedAdminClient() {
- }
-
- @Override
- public void close(final Duration duration) {
- // No op
- }
-
- @Override
- public CreateTopicsResult createTopics(
- final Collection newTopics,
- final CreateTopicsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DeleteTopicsResult deleteTopics(
- final Collection topics,
- final DeleteTopicsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ListTopicsResult listTopics(final ListTopicsOptions options) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DescribeTopicsResult describeTopics(
- final Collection topics,
- final DescribeTopicsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DescribeClusterResult describeCluster(final DescribeClusterOptions options) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DescribeAclsResult describeAcls(
- final AclBindingFilter filter,
- final DescribeAclsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CreateAclsResult createAcls(
- final Collection acls,
- final CreateAclsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DeleteAclsResult deleteAcls(
- final Collection acls,
- final DeleteAclsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DescribeConfigsResult describeConfigs(
- final Collection configs,
- final DescribeConfigsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public AlterConfigsResult incrementalAlterConfigs(
- final Map> configs,
- final AlterConfigsOptions options) {
- throw new UnsupportedOperationException();
- }
-
- @SuppressWarnings({"deprecation", "RedundantSuppression"})
- @Override
- public AlterConfigsResult alterConfigs(
- final Map configs,
- final AlterConfigsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public AlterReplicaLogDirsResult alterReplicaLogDirs(
- final Map replicaAssignment,
- final AlterReplicaLogDirsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DescribeLogDirsResult describeLogDirs(
- final Collection brokers,
- final DescribeLogDirsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DescribeReplicaLogDirsResult describeReplicaLogDirs(
- final Collection replicas,
- final DescribeReplicaLogDirsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CreatePartitionsResult createPartitions(
- final Map newPartitions,
- final CreatePartitionsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DeleteRecordsResult deleteRecords(
- final Map recordsToDelete,
- final DeleteRecordsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CreateDelegationTokenResult createDelegationToken(
- final CreateDelegationTokenOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public RenewDelegationTokenResult renewDelegationToken(
- final byte[] hmac,
- final RenewDelegationTokenOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ExpireDelegationTokenResult expireDelegationToken(
- final byte[] hmac,
- final ExpireDelegationTokenOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DescribeDelegationTokenResult describeDelegationToken(
- final DescribeDelegationTokenOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DescribeConsumerGroupsResult describeConsumerGroups(
- final Collection groupIds,
- final DescribeConsumerGroupsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ListConsumerGroupsResult listConsumerGroups(final ListConsumerGroupsOptions options) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(
- final String groupId,
- final ListConsumerGroupOffsetsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DeleteConsumerGroupsResult deleteConsumerGroups(
- final Collection groupIds,
- final DeleteConsumerGroupsOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public org.apache.kafka.clients.admin.ElectPreferredLeadersResult electPreferredLeaders(
- final Collection partitions,
- final org.apache.kafka.clients.admin.ElectPreferredLeadersOptions options
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ElectLeadersResult electLeaders(
- final ElectionType electionType,
- final Set set,
- final ElectLeadersOptions electLeadersOptions
- ) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public AlterPartitionReassignmentsResult alterPartitionReassignments(
- final Map> reassignments,
- final AlterPartitionReassignmentsOptions options) {
- throw new UnsupportedOperationException();
- }
+final class SandboxedAdminClient {
- @Override
- public ListPartitionReassignmentsResult listPartitionReassignments(
- final Optional> partitions,
- final ListPartitionReassignmentsOptions options) {
- throw new UnsupportedOperationException();
+ static Admin createProxy() {
+ return LimitedProxyBuilder.forClass(Admin.class)
+ .swallow("close", anyParams())
+ .build();
}
- @Override
- public Map metrics() {
- throw new UnsupportedOperationException();
+ private SandboxedAdminClient() {
}
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaClientSupplier.java b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaClientSupplier.java
index 8fc079119cf7..4f0a4b4e0728 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaClientSupplier.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaClientSupplier.java
@@ -36,7 +36,7 @@ class SandboxedKafkaClientSupplier implements KafkaClientSupplier {
@Override
public Admin getAdmin(final Map config) {
- return new SandboxedAdminClient();
+ return SandboxedAdminClient.createProxy();
}
@Override
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedAdminClientTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedAdminClientTest.java
index 151118f1fcd3..35ec457a9133 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedAdminClientTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedAdminClientTest.java
@@ -23,7 +23,6 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ElectLeadersOptions;
import org.junit.Before;
import org.junit.Test;
@@ -51,16 +50,16 @@ public static Collection> getMethodsToTest() {
.build();
}
- private final TestCase testCase;
- private AdminClient sandboxedAdminClient;
+ private final TestCase testCase;
+ private Admin sandboxedAdminClient;
- public UnsupportedMethods(final TestCase testCase) {
+ public UnsupportedMethods(final TestCase testCase) {
this.testCase = Objects.requireNonNull(testCase, "testCase");
}
@Before
public void setUp() {
- sandboxedAdminClient = new SandboxedAdminClient();
+ sandboxedAdminClient = SandboxedAdminClient.createProxy();
}
@Test(expected = UnsupportedOperationException.class)
@@ -71,11 +70,11 @@ public void shouldThrowOnUnsupportedOperation() throws Throwable {
public static class SupportedMethods {
- private AdminClient sandboxedAdminClient;
+ private Admin sandboxedAdminClient;
@Before
public void setUp() {
- sandboxedAdminClient = new SandboxedAdminClient();
+ sandboxedAdminClient = SandboxedAdminClient.createProxy();
}
@SuppressWarnings("deprecation")
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaClientSupplierTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaClientSupplierTest.java
index c9b53aa0a1ee..80d68ae141e3 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaClientSupplierTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaClientSupplierTest.java
@@ -16,7 +16,6 @@
package io.confluent.ksql.services;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import com.google.common.collect.ImmutableMap;
@@ -26,6 +25,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.junit.Before;
@@ -82,9 +82,10 @@ public void setUp() {
}
@Test
- public void shouldReturnTryAdminClient() {
- assertThat(sandboxedKafkaClientSupplier.getAdmin(config),
- is(instanceOf(SandboxedAdminClient.class)));
+ public void shouldReturnSandboxAdminClient() {
+ final Admin admin = sandboxedKafkaClientSupplier.getAdmin(config);
+
+ assertThat(Proxy.isProxyClass(admin.getClass()), is(true));
}
@Test