Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: switch AdminClient to be sandbox proxy #3351

Merged
merged 1 commit into from
Sep 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,78 +15,10 @@

package io.confluent.ksql.services;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions;
import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateAclsOptions;
import org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
import org.apache.kafka.clients.admin.CreatePartitionsOptions;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteAclsOptions;
import org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.clients.admin.DeleteRecordsOptions;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
import org.apache.kafka.clients.admin.DescribeLogDirsOptions;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ElectLeadersOptions;
import org.apache.kafka.clients.admin.ElectLeadersResult;
import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsOptions;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import static io.confluent.ksql.util.LimitedProxyBuilder.anyParams;

import io.confluent.ksql.util.LimitedProxyBuilder;
import org.apache.kafka.clients.admin.Admin;

/**
* An admin client to use while trying out operations.
Expand All @@ -96,231 +28,14 @@
* <p>Most operations result in a {@code UnsupportedOperationException} being thrown as they are
* not called.
*/
class SandboxedAdminClient extends AdminClient {

SandboxedAdminClient() {
}

@Override
public void close(final Duration duration) {
// No op
}

@Override
public CreateTopicsResult createTopics(
final Collection<NewTopic> newTopics,
final CreateTopicsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DeleteTopicsResult deleteTopics(
final Collection<String> topics,
final DeleteTopicsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public ListTopicsResult listTopics(final ListTopicsOptions options) {
throw new UnsupportedOperationException();
}

@Override
public DescribeTopicsResult describeTopics(
final Collection<String> topics,
final DescribeTopicsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeClusterResult describeCluster(final DescribeClusterOptions options) {
throw new UnsupportedOperationException();
}

@Override
public DescribeAclsResult describeAcls(
final AclBindingFilter filter,
final DescribeAclsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public CreateAclsResult createAcls(
final Collection<AclBinding> acls,
final CreateAclsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DeleteAclsResult deleteAcls(
final Collection<AclBindingFilter> acls,
final DeleteAclsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeConfigsResult describeConfigs(
final Collection<ConfigResource> configs,
final DescribeConfigsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public AlterConfigsResult incrementalAlterConfigs(
final Map<ConfigResource, Collection<AlterConfigOp>> configs,
final AlterConfigsOptions options) {
throw new UnsupportedOperationException();
}

@SuppressWarnings({"deprecation", "RedundantSuppression"})
@Override
public AlterConfigsResult alterConfigs(
final Map<ConfigResource, Config> configs,
final AlterConfigsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public AlterReplicaLogDirsResult alterReplicaLogDirs(
final Map<TopicPartitionReplica, String> replicaAssignment,
final AlterReplicaLogDirsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeLogDirsResult describeLogDirs(
final Collection<Integer> brokers,
final DescribeLogDirsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeReplicaLogDirsResult describeReplicaLogDirs(
final Collection<TopicPartitionReplica> replicas,
final DescribeReplicaLogDirsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public CreatePartitionsResult createPartitions(
final Map<String, NewPartitions> newPartitions,
final CreatePartitionsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DeleteRecordsResult deleteRecords(
final Map<TopicPartition, RecordsToDelete> recordsToDelete,
final DeleteRecordsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public CreateDelegationTokenResult createDelegationToken(
final CreateDelegationTokenOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public RenewDelegationTokenResult renewDelegationToken(
final byte[] hmac,
final RenewDelegationTokenOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public ExpireDelegationTokenResult expireDelegationToken(
final byte[] hmac,
final ExpireDelegationTokenOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeDelegationTokenResult describeDelegationToken(
final DescribeDelegationTokenOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeConsumerGroupsResult describeConsumerGroups(
final Collection<String> groupIds,
final DescribeConsumerGroupsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public ListConsumerGroupsResult listConsumerGroups(final ListConsumerGroupsOptions options) {
throw new UnsupportedOperationException();
}

@Override
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(
final String groupId,
final ListConsumerGroupOffsetsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DeleteConsumerGroupsResult deleteConsumerGroups(
final Collection<String> groupIds,
final DeleteConsumerGroupsOptions options
) {
throw new UnsupportedOperationException();
}

@SuppressWarnings("deprecation")
@Override
public org.apache.kafka.clients.admin.ElectPreferredLeadersResult electPreferredLeaders(
final Collection<TopicPartition> partitions,
final org.apache.kafka.clients.admin.ElectPreferredLeadersOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public ElectLeadersResult electLeaders(
final ElectionType electionType,
final Set<TopicPartition> set,
final ElectLeadersOptions electLeadersOptions
) {
throw new UnsupportedOperationException();
}

@Override
public AlterPartitionReassignmentsResult alterPartitionReassignments(
final Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
final AlterPartitionReassignmentsOptions options) {
throw new UnsupportedOperationException();
}
final class SandboxedAdminClient {

@Override
public ListPartitionReassignmentsResult listPartitionReassignments(
final Optional<Set<TopicPartition>> partitions,
final ListPartitionReassignmentsOptions options) {
throw new UnsupportedOperationException();
static Admin createProxy() {
return LimitedProxyBuilder.forClass(Admin.class)
.swallow("close", anyParams())
.build();
}

@Override
public Map<MetricName, ? extends Metric> metrics() {
throw new UnsupportedOperationException();
private SandboxedAdminClient() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SandboxedKafkaClientSupplier implements KafkaClientSupplier {

@Override
public Admin getAdmin(final Map<String, Object> config) {
return new SandboxedAdminClient();
return SandboxedAdminClient.createProxy();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ElectLeadersOptions;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -51,16 +50,16 @@ public static Collection<TestCase<Admin>> getMethodsToTest() {
.build();
}

private final TestCase<AdminClient> testCase;
private AdminClient sandboxedAdminClient;
private final TestCase<Admin> testCase;
private Admin sandboxedAdminClient;

public UnsupportedMethods(final TestCase<AdminClient> testCase) {
public UnsupportedMethods(final TestCase<Admin> testCase) {
this.testCase = Objects.requireNonNull(testCase, "testCase");
}

@Before
public void setUp() {
sandboxedAdminClient = new SandboxedAdminClient();
sandboxedAdminClient = SandboxedAdminClient.createProxy();
}

@Test(expected = UnsupportedOperationException.class)
Expand All @@ -71,11 +70,11 @@ public void shouldThrowOnUnsupportedOperation() throws Throwable {

public static class SupportedMethods {

private AdminClient sandboxedAdminClient;
private Admin sandboxedAdminClient;

@Before
public void setUp() {
sandboxedAdminClient = new SandboxedAdminClient();
sandboxedAdminClient = SandboxedAdminClient.createProxy();
}

@SuppressWarnings("deprecation")
Expand Down
Loading