Skip to content

Commit

Permalink
fix: show queries now returns the correct Kafka Topic if the query st…
Browse files Browse the repository at this point in the history
…ring contains with clause
  • Loading branch information
stevenpyzhang committed Feb 4, 2020
1 parent d35822d commit df9b17b
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@
public class QueriesTableBuilder implements TableBuilder<Queries> {

private static final List<String> HEADERS =
ImmutableList.of("Query ID", "Status", "Kafka Topic", "Query String");
ImmutableList.of("Query ID", "Status", "Sink Kafka Topic", "Sink Name", "Query String");

@Override
public Table buildTable(final Queries entity) {
final Stream<List<String>> rows = entity.getQueries().stream()
.map(r -> ImmutableList.of(
r.getId().getId(),
r.getState().orElse("N/A"),
String.join(",", r.getSinks()),
String.join(",", r.getSinkKafkaTopics()),
String.join(",", r.getSinkNames()),
r.getQuerySingleLine()
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void testPrintQueries() {
final List<RunningQuery> queries = new ArrayList<>();
queries.add(
new RunningQuery(
"select * from t1", Collections.singleton("Test"), new QueryId("0"), Optional.of("Foobar")));
"select * from t1", Collections.singleton("Test"), Collections.singleton("Test topic"), new QueryId("0"), Optional.of("Foobar")));

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new Queries("e", queries)
Expand All @@ -304,18 +304,19 @@ public void testPrintQueries() {
+ " \"statementText\" : \"e\",\n"
+ " \"queries\" : [ {\n"
+ " \"queryString\" : \"select * from t1\",\n"
+ " \"sinks\" : [ \"Test\" ],\n"
+ " \"sinkNames\" : [ \"Test\" ],\n"
+ " \"sinkKafkaTopics\" : [ \"Test topic\" ],\n"
+ " \"id\" : \"0\",\n"
+ " \"state\" : \"Foobar\"\n"
+ " } ],\n"
+ " \"warnings\" : [ ]\n"
+ "} ]\n"));
} else {
assertThat(output, is("\n"
+ " Query ID | Status | Kafka Topic | Query String \n"
+ "----------------------------------------------------\n"
+ " 0 | Foobar | Test | select * from t1 \n"
+ "----------------------------------------------------\n"
+ " Query ID | Status | Sink Kafka Topic | Sink Name | Query String \n"
+ "---------------------------------------------------------------------\n"
+ " 0 | Foobar | Test topic | Test | select * from t1 \n"
+ "---------------------------------------------------------------------\n"
+ "For detailed information on a Query run: EXPLAIN <Query ID>;\n"));
}
}
Expand All @@ -337,10 +338,10 @@ public void testPrintSourceDescription() {
);

final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId"), Optional.of("Running"))
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), Optional.of("Running"))
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId"), Optional.of("Running"))
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), Optional.of("Running"))
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
Expand Down Expand Up @@ -383,13 +384,15 @@ public void testPrintSourceDescription() {
+ " \"windowType\" : null,\n"
+ " \"readQueries\" : [ {\n"
+ " \"queryString\" : \"read query\",\n"
+ " \"sinks\" : [ \"sink1\" ],\n"
+ " \"sinkNames\" : [ \"sink1\" ],\n"
+ " \"sinkKafkaTopics\" : [ \"sink1 topic\" ],\n"
+ " \"id\" : \"readId\",\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
+ " \"writeQueries\" : [ {\n"
+ " \"queryString\" : \"write query\",\n"
+ " \"sinks\" : [ \"sink2\" ],\n"
+ " \"sinkNames\" : [ \"sink2\" ],\n"
+ " \"sinkKafkaTopics\" : [ \"sink2 topic\" ],\n"
+ " \"id\" : \"writeId\",\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
Expand Down Expand Up @@ -990,10 +993,10 @@ public void testPrintExecuptionPlan() {
public void shouldPrintTopicDescribeExtended() {
// Given:
final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId"), Optional.of("Running"))
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), Optional.of("Running"))
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId"), Optional.of("Running"))
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), Optional.of("Running"))
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
Expand Down Expand Up @@ -1035,13 +1038,15 @@ public void shouldPrintTopicDescribeExtended() {
+ " \"windowType\" : null,\n"
+ " \"readQueries\" : [ {\n"
+ " \"queryString\" : \"read query\",\n"
+ " \"sinks\" : [ \"sink1\" ],\n"
+ " \"sinkNames\" : [ \"sink1\" ],\n"
+ " \"sinkKafkaTopics\" : [ \"sink1 topic\" ],\n"
+ " \"id\" : \"readId\",\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
+ " \"writeQueries\" : [ {\n"
+ " \"queryString\" : \"write query\",\n"
+ " \"sinks\" : [ \"sink2\" ],\n"
+ " \"sinkNames\" : [ \"sink2\" ],\n"
+ " \"sinkKafkaTopics\" : [ \"sink2 topic\" ],\n"
+ " \"id\" : \"writeId\",\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static Optional<KsqlEntity> execute(
.map(q -> new RunningQuery(
q.getStatementString(),
ImmutableSet.of(q.getSinkName().name()),
ImmutableSet.of(q.getResultTopic().getKafkaTopicName()),
q.getQueryId(),
Optional.of(q.getState())
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ private static List<RunningQuery> getQueries(
.map(q -> new RunningQuery(
q.getStatementString(),
ImmutableSet.of(q.getSinkName().name()),
ImmutableSet.of(q.getResultTopic().getKafkaTopicName()),
q.getQueryId(),
Optional.of(q.getState())
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void shouldListQueriesBasic() {
new RunningQuery(
metadata.getStatementString(),
ImmutableSet.of(metadata.getSinkName().name()),
ImmutableSet.of(metadata.getResultTopic().getKafkaTopicName()),
metadata.getQueryId(),
Optional.of(metadata.getState())
)));
Expand Down Expand Up @@ -124,6 +125,7 @@ public static PersistentQueryMetadata givenPersistentQuery(final String id) {

final KsqlTopic sinkTopic = mock(KsqlTopic.class);
when(sinkTopic.getKeyFormat()).thenReturn(KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)));
when(sinkTopic.getKafkaTopicName()).thenReturn(id);
when(metadata.getResultTopic()).thenReturn(sinkTopic);

return metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ public void shouldShowColumnsSource() {
ImmutableList.of(new RunningQuery(
metadata.getStatementString(),
ImmutableSet.of(metadata.getSinkName().toString(FormatOptions.noEscape())),
ImmutableSet.of(metadata.getResultTopic().getKafkaTopicName()),
metadata.getQueryId(),
Optional.of(metadata.getState())
)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1958,6 +1958,7 @@ private List<RunningQuery> createRunningQueries(
.map(md -> new RunningQuery(
md.getStatementString(),
ImmutableSet.of(md.getSinkName().toString(FormatOptions.noEscape())),
ImmutableSet.of(md.getResultTopic().getKafkaTopicName()),
md.getQueryId(),
Optional.of(md.getState())
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@
public class RunningQuery {

private final String queryString;
private final Set<String> sinks;
private final Set<String> sinkKafkaTopics;
private final Set<String> sinkNames;
private final QueryId id;
private final Optional<String> state;

@JsonCreator
public RunningQuery(
@JsonProperty("queryString") final String queryString,
@JsonProperty("sinks") final Set<String> sinks,
@JsonProperty("sinkNames") final Set<String> sinkNames,
@JsonProperty("sinkKafkaTopics") final Set<String> sinkKafkaTopics,
@JsonProperty("id") final QueryId id,
@JsonProperty("state") final Optional<String> state
) {
this.queryString = Objects.requireNonNull(queryString, "queryString");
this.sinks = Objects.requireNonNull(sinks, "sinks");
this.sinkKafkaTopics = Objects.requireNonNull(sinkKafkaTopics, "sinkKafkaTopics");
this.sinkNames = Objects.requireNonNull(sinkNames, "sinkNames");
this.id = Objects.requireNonNull(id, "id");
this.state = Objects.requireNonNull(state, "state");
}
Expand All @@ -54,8 +57,12 @@ public String getQuerySingleLine() {
return queryString.replaceAll(System.lineSeparator(), "");
}

public Set<String> getSinks() {
return sinks;
public Set<String> getSinkNames() {
return sinkNames;
}

public Set<String> getSinkKafkaTopics() {
return sinkKafkaTopics;
}

public QueryId getId() {
Expand All @@ -77,12 +84,13 @@ public boolean equals(final Object o) {
final RunningQuery that = (RunningQuery) o;
return Objects.equals(id, that.id)
&& Objects.equals(queryString, that.queryString)
&& Objects.equals(sinks, that.sinks)
&& Objects.equals(sinkKafkaTopics, that.sinkKafkaTopics)
&& Objects.equals(sinkNames, that.sinkNames)
&& Objects.equals(state, that.state);
}

@Override
public int hashCode() {
return Objects.hash(id, queryString, id, state);
return Objects.hash(id, queryString, sinkKafkaTopics, sinkNames, state);
}
}

0 comments on commit df9b17b

Please sign in to comment.