Skip to content

Commit

Permalink
feat(client): support list queries in Java client (#5682)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Jun 25, 2020
1 parent 97f7f3b commit 4d860f8
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ public interface Client {
*/
CompletableFuture<List<TopicInfo>> listTopics();

/**
* Returns the list of queries currently running on the ksqlDB server.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @return list of queries
*/
CompletableFuture<List<QueryInfo>> listQueries();

/**
* Closes the underlying HTTP client.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

import java.util.Optional;

/**
* Metadata for a ksqlDB query.
*/
public interface QueryInfo {

enum QueryType {
PERSISTENT,
PUSH
}

/**
* @return the type of this query
*/
QueryType getQueryType();

/**
* Returns the ID of this query, used for control operations such as terminating the query.
*
* @return the ID of this query
*/
String getId();

/**
* Returns the KSQL statement text corresponding to this query. This text may not be exactly the
* statement submitted in order to start the query, but submitting this statement will result
* in exactly this query.
*
* @return the KSQL statement text
*/
String getSql();

/**
* Returns the name of the sink ksqlDB stream or table that this query writes to, if this query is
* persistent. If this query is a push query, then the returned optional will be empty.
*
* @return the sink ksqlDB stream or table name, if applicable
*/
Optional<String> getSink();

/**
* Returns the name of the Kafka topic that backs the sink ksqlDB stream or table that this query
* writes to, if this query is persistent. If this query is a push query, then the returned
* optional will be empty.
*
* @return the sink Kafka topic name, if applicable
*/
Optional<String> getSinkTopic();

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.QueryInfo.QueryType;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -95,4 +98,50 @@ static void handleListTopicsResponse(
}
}

static void handleListQueriesResponse(
final JsonObject queriesEntity,
final CompletableFuture<List<QueryInfo>> cf
) {
try {
final JsonArray queries = queriesEntity.getJsonArray("queries");
cf.complete(queries.stream()
.map(o -> (JsonObject) o)
.map(o -> {
final QueryType queryType = QueryType.valueOf(o.getString("queryType"));
final JsonArray sinks = o.getJsonArray("sinks");
final JsonArray sinkTopics = o.getJsonArray("sinkKafkaTopics");

final Optional<String> sinkName;
final Optional<String> sinkTopicName;
if (queryType == QueryType.PERSISTENT) {
if (sinks.size() != 1 || sinkTopics.size() != 1) {
throw new IllegalStateException("Persistent queries must have exactly one sink.");
}
sinkName = Optional.of(sinks.getString(0));
sinkTopicName = Optional.of(sinkTopics.getString(0));
} else if (queryType == QueryType.PUSH) {
if (sinks.size() != 0 || sinkTopics.size() != 0) {
throw new IllegalStateException("Push queries must have no sinks.");
}
sinkName = Optional.empty();
sinkTopicName = Optional.empty();
} else {
throw new IllegalStateException("Unexpected query type.");
}

return new QueryInfoImpl(
queryType,
o.getString("id"),
o.getString("queryString"),
sinkName,
sinkTopicName);
})
.collect(Collectors.toList())
);
} catch (Exception e) {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + queriesEntity));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.TableInfo;
Expand Down Expand Up @@ -204,6 +205,21 @@ public CompletableFuture<List<TopicInfo>> listTopics() {
return cf;
}

@Override
public CompletableFuture<List<QueryInfo>> listQueries() {
final CompletableFuture<List<QueryInfo>> cf = new CompletableFuture<>();

makeRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "list queries;"),
cf,
response -> handleSingleEntityResponse(
response, cf, AdminResponseHandlers::handleListQueriesResponse)
);

return cf;
}

@Override
public void close() {
httpClient.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.QueryInfo;
import java.util.Objects;
import java.util.Optional;

public class QueryInfoImpl implements QueryInfo {

private final QueryType queryType;
private final String id;
private final String sql;
private final Optional<String> sinkName;
private final Optional<String> sinkTopicName;

QueryInfoImpl(
final QueryType queryType,
final String id,
final String sql,
final Optional<String> sinkName,
final Optional<String> sinkTopicName
) {
this.queryType = Objects.requireNonNull(queryType);
this.id = Objects.requireNonNull(id);
this.sql = Objects.requireNonNull(sql);
this.sinkName = Objects.requireNonNull(sinkName);
this.sinkTopicName = Objects.requireNonNull(sinkTopicName);
}

@Override
public QueryType getQueryType() {
return queryType;
}

@Override
public String getId() {
return id;
}

@Override
public String getSql() {
return sql;
}

@Override
public Optional<String> getSink() {
return sinkName;
}

@Override
public Optional<String> getSinkTopic() {
return sinkTopicName;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final QueryInfoImpl queryInfo = (QueryInfoImpl) o;
return queryType == queryInfo.queryType
&& id.equals(queryInfo.id)
&& sql.equals(queryInfo.sql)
&& sinkName.equals(queryInfo.sinkName)
&& sinkTopicName.equals(queryInfo.sinkTopicName);
}

@Override
public int hashCode() {
return Objects.hash(queryType, id, sql, sinkName, sinkTopicName);
}

@Override
public String toString() {
return "QueryInfo{"
+ "queryType=" + queryType
+ ", id='" + id + '\''
+ ", sql='" + sql + '\''
+ ", sinkName=" + sinkName
+ ", sinkTopicName=" + sinkTopicName
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@
import static org.junit.Assert.assertThrows;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.api.BaseApiTest;
import io.confluent.ksql.api.TestQueryPublisher;
import io.confluent.ksql.api.client.QueryInfo.QueryType;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.exception.KsqlException;
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
Expand All @@ -42,12 +45,18 @@
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.KafkaTopicInfo;
import io.confluent.ksql.rest.entity.KafkaTopicsList;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.QueryStatusCount;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SourceInfo;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.util.KsqlConstants.KsqlQueryStatus;
import io.confluent.ksql.util.KsqlConstants.KsqlQueryType;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
Expand All @@ -57,6 +66,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -662,6 +672,44 @@ public void shouldHandleErrorFromListTopics() {
assertThat(e.getCause().getMessage(), containsString("Failed to retrieve Kafka Topic names"));
}

@Test
public void shouldListQueries() throws Exception {
// Given
final List<RunningQuery> expectedQueries = new ArrayList<>();
expectedQueries.add(new RunningQuery(
"sql1",
ImmutableSet.of("sink"),
ImmutableSet.of("sink_topic"),
new QueryId("a_persistent_query"),
new QueryStatusCount(ImmutableMap.of(KsqlQueryStatus.RUNNING, 1)),
KsqlQueryType.PERSISTENT));
expectedQueries.add(new RunningQuery(
"sql2",
Collections.emptySet(),
Collections.emptySet(),
new QueryId("a_push_query"),
new QueryStatusCount(),
KsqlQueryType.PUSH));
final Queries entity = new Queries("list queries;", expectedQueries);
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When
final List<QueryInfo> queries = javaClient.listQueries().get();

// Then
assertThat(queries, hasSize(expectedQueries.size()));
assertThat(queries.get(0).getQueryType(), is(QueryType.PERSISTENT));
assertThat(queries.get(0).getId(), is("a_persistent_query"));
assertThat(queries.get(0).getSql(), is("sql1"));
assertThat(queries.get(0).getSink(), is(Optional.of("sink")));
assertThat(queries.get(0).getSinkTopic(), is(Optional.of("sink_topic")));
assertThat(queries.get(1).getQueryType(), is(QueryType.PUSH));
assertThat(queries.get(1).getId(), is("a_push_query"));
assertThat(queries.get(1).getSql(), is("sql2"));
assertThat(queries.get(1).getSink(), is(Optional.empty()));
assertThat(queries.get(1).getSinkTopic(), is(Optional.empty()));
}

protected Client createJavaClient() {
return Client.create(createJavaClientOptions(), vertx);
}
Expand Down
Loading

0 comments on commit 4d860f8

Please sign in to comment.