Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): support admin operations in Java client #5671

Merged
merged 11 commits into from
Jun 24, 2020
Prev Previous commit
Next Next commit
feat: support list topics
  • Loading branch information
vcrfxia committed Jun 23, 2020

Verified

This commit was signed with the committer’s verified signature.
Ana06 Ana María Martínez Gómez
commit d6ee218dd8ddad625a9a73598dddb65081ddcfc2
Original file line number Diff line number Diff line change
@@ -106,6 +106,8 @@ public interface Client {

CompletableFuture<List<TableInfo>> listTables();

CompletableFuture<List<TopicInfo>> listTopics();

/**
* Closes the underlying HTTP client.
*/
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

import java.util.List;

public interface TopicInfo {

String getName();

int getPartitions();

List<Integer> getReplicasPerPartition();

}
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
@@ -182,6 +183,20 @@ public CompletableFuture<List<TableInfo>> listTables() {
return cf;
}

@Override
public CompletableFuture<List<TopicInfo>> listTopics() {
final CompletableFuture<List<TopicInfo>> cf = new CompletableFuture<>();

makeRequest(
"/ksql",
new JsonObject().put("ksql", "list topics;"),
cf,
response -> handleSingleEntityResponse(response, cf, ClientImpl::handleListTopicsResponse)
);

return cf;
}

@Override
public void close() {
httpClient.close();
@@ -313,6 +328,32 @@ private static void handleListTablesResponse(
}
}

@SuppressWarnings("unchecked")
private static void handleListTopicsResponse(
final JsonObject kafkaTopicsListEntity,
final CompletableFuture<List<TopicInfo>> cf
) {
try {
final JsonArray topics = kafkaTopicsListEntity.getJsonArray("topics");
cf.complete(topics.stream()
.map(o -> (JsonObject) o)
.map(o -> {
final List<Integer> replicaInfo = o.getJsonArray("replicaInfo").stream()
.map(v -> (Integer)v)
.collect(Collectors.toList());
return new TopicInfoImpl(
o.getString("name"),
replicaInfo.size(),
replicaInfo);
})
.collect(Collectors.toList())
);
} catch (Exception e) {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + kafkaTopicsListEntity));
}
}

private static <T> void handleSingleEntityResponse(
final HttpClientResponse response,
final CompletableFuture<T> cf,
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.StreamInfo;
import java.util.Objects;

public class StreamInfoImpl implements StreamInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally worth having equals/hashcode/tostring on externally exposed classes so that people can leverage them if they want to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I understand correctly: you're suggesting we add equals/hashcode/tostring methods to each of the implementation classes, not the interfaces, right? This makes sense to me, though I'm not sure whether it makes more sense for the toString method to say StreamInfoImpl{...} or StreamInfo{...} since the former is an implementation detail, but the latter is a bit misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point - I'm satisfied with StreamInfo because I dont' think we expect more than just this impl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'm going to merge this PR and then open a follow-up to add equals/hashcode/tostring to all the client interfaces, since none of the new interfaces have them (not just the ones in this PR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the follow-up: #5681


@@ -24,9 +25,9 @@ public class StreamInfoImpl implements StreamInfo {
private final String format;

StreamInfoImpl(final String name, final String topicName, final String format) {
this.name = name;
this.topicName = topicName;
this.format = format;
this.name = Objects.requireNonNull(name);
this.topicName = Objects.requireNonNull(topicName);
this.format = Objects.requireNonNull(format);
}

@Override
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.TableInfo;
import java.util.Objects;

public class TableInfoImpl implements TableInfo {

@@ -30,9 +31,9 @@ public class TableInfoImpl implements TableInfo {
final String format,
final boolean isWindowed
) {
this.name = name;
this.topicName = topicName;
this.format = format;
this.name = Objects.requireNonNull(name);
this.topicName = Objects.requireNonNull(topicName);
this.format = Objects.requireNonNull(format);
this.isWindowed = isWindowed;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.TopicInfo;
import java.util.List;
import java.util.Objects;

public class TopicInfoImpl implements TopicInfo {

private final String name;
private final int partitions;
private final List<Integer> replicasPerPartition;

TopicInfoImpl(final String name, final int partitions, final List<Integer> replicasPerPartition) {
this.name = Objects.requireNonNull(name);
this.partitions = partitions;
this.replicasPerPartition = Objects.requireNonNull(replicasPerPartition);
}

@Override
public String getName() {
return name;
}

@Override
public int getPartitions() {
return partitions;
}

@Override
public List<Integer> getReplicasPerPartition() {
return replicasPerPartition;
}
}
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThrows;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.api.BaseApiTest;
import io.confluent.ksql.api.TestQueryPublisher;
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
@@ -38,6 +39,8 @@
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.rest.entity.KafkaTopicInfo;
import io.confluent.ksql.rest.entity.KafkaTopicsList;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.confluent.ksql.rest.entity.SourceInfo;
import io.confluent.ksql.rest.entity.StreamsList;
@@ -615,6 +618,28 @@ public void shouldListTables() throws Exception {
assertThat(tables.get(1).isWindowed(), is(false));
}

@Test
public void shouldListTopics() throws Exception {
// Given
final List<KafkaTopicInfo> expectedTopics = new ArrayList<>();
expectedTopics.add(new KafkaTopicInfo("topic1", ImmutableList.of(2, 2, 2)));
expectedTopics.add(new KafkaTopicInfo("topic2", ImmutableList.of(1, 1)));
final KafkaTopicsList entity = new KafkaTopicsList("list topics;", expectedTopics);
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When
final List<TopicInfo> topics = javaClient.listTopics().get();

// Then
assertThat(topics, hasSize(expectedTopics.size()));
assertThat(topics.get(0).getName(), is("topic1"));
assertThat(topics.get(0).getPartitions(), is(3));
assertThat(topics.get(0).getReplicasPerPartition(), is(ImmutableList.of(2, 2, 2)));
assertThat(topics.get(1).getName(), is("topic2"));
assertThat(topics.get(1).getPartitions(), is(2));
assertThat(topics.get(1).getReplicasPerPartition(), is(ImmutableList.of(1, 1)));
}

protected Client createJavaClient() {
return Client.create(createJavaClientOptions(), vertx);
}
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;
import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.engine.KsqlEngine;
@@ -605,7 +606,6 @@ public void shouldListStreams() throws Exception {
));
}

@SuppressWarnings("unchecked")
@Test
public void shouldListTables() throws Exception {
// When
@@ -615,6 +615,20 @@ public void shouldListTables() throws Exception {
assertThat(tables, contains(tableInfo(AGG_TABLE, AGG_TABLE, "JSON", false)));
}

@SuppressWarnings("unchecked")
@Test
public void shouldListTopics() throws Exception {
// When
final List<TopicInfo> topics = client.listTopics().get();

// Then
assertThat(topics, containsInAnyOrder(
topicInfo(TEST_TOPIC),
topicInfo(EMPTY_TEST_TOPIC),
topicInfo(AGG_TABLE)
));
}

private Client createClient() {
final ClientOptions clientOptions = ClientOptions.create()
.setHost("localhost")
@@ -865,4 +879,31 @@ public void describeTo(final Description description) {
}
};
}

// validates topics have 1 partition and 1 replica
private static Matcher<? super TopicInfo> topicInfo(final String name) {
return new TypeSafeDiagnosingMatcher<TopicInfo>() {
@Override
protected boolean matchesSafely(
final TopicInfo actual,
final Description mismatchDescription) {
if (!name.equals(actual.getName())) {
return false;
}
if (actual.getPartitions() != 1) {
return false;
}
final List<Integer> replicasPerPartition = actual.getReplicasPerPartition();
if (replicasPerPartition.size() != 1 || replicasPerPartition.get(0) != 1) {
return false;
}
return true;
}

@Override
public void describeTo(final Description description) {
description.appendText("name: " + name);
}
};
}
}