Skip to content

Commit

Permalink
feat(client): support describe source in Java client (#5944)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Aug 18, 2020
1 parent 2567d92 commit a154373
Show file tree
Hide file tree
Showing 12 changed files with 909 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ public interface Client {
*/
CompletableFuture<List<QueryInfo>> listQueries();

/**
* Returns metadata about the ksqlDB stream or table of the provided name.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @param sourceName stream or table name
* @return metadata for stream or table
*/
CompletableFuture<SourceDescription> describeSource(String sourceName);

/**
* Closes the underlying HTTP client.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

/**
* Represents a field/column of a ksqlDB stream/table.
*/
public interface FieldInfo {

/**
* @return name of this field
*/
String name();

/**
* @return type of this field
*/
ColumnType type();

/**
* @return whether this field is a key field, rather than a value field
*/
boolean isKey();

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ enum QueryType {
String getId();

/**
* Returns the KSQL statement text corresponding to this query. This text may not be exactly the
* Returns the ksqlDB statement text corresponding to this query. This text may not be exactly the
* statement submitted in order to start the query, but submitting this statement will result
* in exactly this query.
*
* @return the KSQL statement text
* @return the ksqlDB statement text
*/
String getSql();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

import java.util.List;
import java.util.Optional;

/**
* Metadata for a ksqlDB stream or table.
*/
public interface SourceDescription {

/**
* @return name of this stream/table
*/
String name();

/**
* @return type of this source, i.e., whether this source is a stream or table
*/
String type();

/**
* @return list of fields (key and value) present in this stream/table
*/
List<FieldInfo> fields();

/**
* @return name of the Kafka topic underlying this ksqlDB stream/table
*/
String topic();

/**
* @return key serialization format of the data in this stream/table
*/
String keyFormat();

/**
* @return value serialization format of the data in this stream/table
*/
String valueFormat();

/**
* @return list of ksqlDB queries currently reading from this stream/table
*/
List<QueryInfo> readQueries();

/**
* @return list of ksqlDB queries currently writing to this stream/table
*/
List<QueryInfo> writeQueries();

/**
* @return name of the column configured as the {@code TIMESTAMP} for this stream/table, if any
*/
Optional<String> timestampColumn();

/**
* Returns the type of the window (e.g., "TUMBLING", "HOPPING", "SESSION") associated with this
* source, if this source is a windowed table. Else, empty.
*
* @return type of the window, if applicable
*/
Optional<String> windowType();

/**
* Returns the ksqlDB statement text used to create this stream/table. This text may not be
* exactly the statement submitted in order to create this stream/table, but submitting this
* statement will result in exactly this stream/table being created.
*
* @return the ksqlDB statement text
*/
String sqlStatement();

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.QueryInfo.QueryType;
import io.confluent.ksql.api.client.SourceDescription;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;
Expand Down Expand Up @@ -84,6 +85,19 @@ static void handleListQueriesResponse(
}
}

static void handleDescribeSourceResponse(
final JsonObject sourceDescriptionEntity,
final CompletableFuture<SourceDescription> cf
) {
final Optional<SourceDescription> source = getDescribeSourceResponse(sourceDescriptionEntity);
if (source.isPresent()) {
cf.complete(source.get());
} else {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + sourceDescriptionEntity));
}
}

static boolean isListStreamsResponse(final JsonObject ksqlEntity) {
return getListStreamsResponse(ksqlEntity).isPresent();
}
Expand All @@ -101,7 +115,7 @@ static boolean isListQueriesResponse(final JsonObject ksqlEntity) {
}

static boolean isDescribeSourceResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getJsonObject("sourceDescription") != null;
return getDescribeSourceResponse(ksqlEntity).isPresent();
}

static boolean isDescribeOrListFunctionResponse(final JsonObject ksqlEntity) {
Expand Down Expand Up @@ -227,43 +241,82 @@ private static Optional<List<TopicInfo>> getListTopicsResponse(
private static Optional<List<QueryInfo>> getListQueriesResponse(final JsonObject queriesEntity) {
try {
final JsonArray queries = queriesEntity.getJsonArray("queries");
return Optional.of(queries.stream()
.map(o -> (JsonObject) o)
.map(o -> {
final QueryType queryType = QueryType.valueOf(o.getString("queryType"));
final JsonArray sinks = o.getJsonArray("sinks");
final JsonArray sinkTopics = o.getJsonArray("sinkKafkaTopics");
return Optional.of(formatQueries(queries));
} catch (Exception e) {
return Optional.empty();
}
}

private static List<QueryInfo> formatQueries(final JsonArray queries) {
return queries.stream()
.map(o -> (JsonObject) o)
.map(o -> {
final QueryType queryType = QueryType.valueOf(o.getString("queryType"));
final JsonArray sinks = o.getJsonArray("sinks");
final JsonArray sinkTopics = o.getJsonArray("sinkKafkaTopics");

final Optional<String> sinkName;
final Optional<String> sinkTopicName;
if (queryType == QueryType.PERSISTENT) {
if (sinks.size() != 1 || sinkTopics.size() != 1) {
throw new IllegalStateException("Persistent queries must have exactly one sink.");
}
sinkName = Optional.of(sinks.getString(0));
sinkTopicName = Optional.of(sinkTopics.getString(0));
} else if (queryType == QueryType.PUSH) {
if (sinks.size() != 0 || sinkTopics.size() != 0) {
throw new IllegalStateException("Push queries must have no sinks.");
}
sinkName = Optional.empty();
sinkTopicName = Optional.empty();
} else {
throw new IllegalStateException("Unexpected query type.");
final Optional<String> sinkName;
final Optional<String> sinkTopicName;
if (queryType == QueryType.PERSISTENT) {
if (sinks.size() != 1 || sinkTopics.size() != 1) {
throw new IllegalStateException("Persistent queries must have exactly one sink.");
}
sinkName = Optional.of(sinks.getString(0));
sinkTopicName = Optional.of(sinkTopics.getString(0));
} else if (queryType == QueryType.PUSH) {
if (sinks.size() != 0 || sinkTopics.size() != 0) {
throw new IllegalStateException("Push queries must have no sinks.");
}
sinkName = Optional.empty();
sinkTopicName = Optional.empty();
} else {
throw new IllegalStateException("Unexpected query type.");
}

return new QueryInfoImpl(
queryType,
o.getString("id"),
o.getString("queryString"),
sinkName,
sinkTopicName);
})
.collect(Collectors.toList())
);
return new QueryInfoImpl(
queryType,
o.getString("id"),
o.getString("queryString"),
sinkName,
sinkTopicName);
})
.collect(Collectors.toList());
}

/**
* Attempts to parse the provided response entity as a {@code SourceDescriptionEntity}.
*
* @param sourceDescriptionEntity response entity
* @return optional containing parsed result if successful, else empty
*/
private static Optional<SourceDescription> getDescribeSourceResponse(
final JsonObject sourceDescriptionEntity
) {
try {
final JsonObject source = sourceDescriptionEntity.getJsonObject("sourceDescription");
return Optional.of(new SourceDescriptionImpl(
source.getString("name"),
source.getString("type"),
source.getJsonArray("fields").stream()
.map(o -> (JsonObject)o)
.map(f -> new FieldInfoImpl(
f.getString("name"),
new ColumnTypeImpl(f.getJsonObject("schema").getString("type")),
"KEY".equals(f.getString("type"))))
.collect(Collectors.toList()),
source.getString("topic"),
source.getString("keyFormat"),
source.getString("valueFormat"),
formatQueries(source.getJsonArray("readQueries")),
formatQueries(source.getJsonArray("writeQueries")),
source.getString("timestamp").isEmpty()
? Optional.empty()
: Optional.of(source.getString("timestamp")),
Optional.ofNullable(source.getString("windowType")),
source.getString("statement")
));
} catch (Exception e) {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.api.client.ExecuteStatementResult;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.SourceDescription;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.TableInfo;
Expand Down Expand Up @@ -281,6 +282,21 @@ public CompletableFuture<List<QueryInfo>> listQueries() {
return cf;
}

@Override
public CompletableFuture<SourceDescription> describeSource(final String sourceName) {
final CompletableFuture<SourceDescription> cf = new CompletableFuture<>();

makeRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "describe " + sourceName + ";"),
cf,
response -> handleSingleEntityResponse(
response, cf, AdminResponseHandlers::handleDescribeSourceResponse)
);

return cf;
}

@Override
public void close() {
httpClient.close();
Expand Down
Loading

0 comments on commit a154373

Please sign in to comment.