Skip to content

Commit

Permalink
feat: add consumerGroupId to QueryDescription (#8073)
Browse files Browse the repository at this point in the history
* feat: add consumerGroupId to QueryDescription
  • Loading branch information
wcarlson5 authored Sep 2, 2021
1 parent 38b9e76 commit cce585b
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ public void shouldFailToExplainQueryViaExecuteStatement() {
new QueryDescription(new QueryId("id"), "sql", Optional.empty(),
Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), "topology",
"executionPlan", Collections.emptyMap(), Collections.emptyMap(),
KsqlQueryType.PERSISTENT, Collections.emptyList(), Collections.emptySet()));
KsqlQueryType.PERSISTENT, Collections.emptyList(), Collections.emptySet(), "consumerGroupId"));
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ public void shouldPrintExplainQueryWithError() {
Collections.emptySet(),
Optional.empty()
)
)
),
"consumerGroupId"
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"topicOffsets" : [ ],
"timeCurrentIdlingStarted" : null
} ],
"consumerGroupId" : "consumerGroupId",
"state" : "ERROR"
},
"warnings" : [ ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ private static QueryDescription create(
ksqlHostQueryStatus,
queryMetadata.getQueryType(),
queryMetadata.getQueryErrors(),
queryMetadata.getTaskMetadata()
queryMetadata.getTaskMetadata(),
queryMetadata.getQueryApplicationId()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public void shouldFailOnNonQueryExplain() {
@SuppressWarnings("SameParameterValue")
public static PersistentQueryMetadata givenPersistentQuery(final String id) {
final PersistentQueryMetadata metadata = mock(PersistentQueryMetadata.class);
when(metadata.getQueryApplicationId()).thenReturn("consumer-group-id");
when(metadata.getQueryId()).thenReturn(new QueryId(id));
when(metadata.getSinkName()).thenReturn(Optional.of(SourceName.of(id)));
when(metadata.getLogicalSchema()).thenReturn(TemporaryEngine.SCHEMA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ public static void mockQuery(
when(metadata.getState()).thenReturn(state);
when(metadata.getTopologyDescription()).thenReturn("topology");
when(metadata.getExecutionPlan()).thenReturn("plan");
when(metadata.getQueryApplicationId()).thenReturn("consumer-group-id");

final StreamsMetadata localStreamsMetadata = mock(StreamsMetadata.class);
when(localStreamsMetadata.hostInfo()).thenReturn(LOCAL_HOST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class QueryDescription {
private final KsqlQueryType queryType;
private final List<QueryError> queryErrors;
private final Set<StreamsTaskMetadata> tasksMetadata;
private final String consumerGroupId;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
@SuppressWarnings("WeakerAccess") // Invoked via reflection
Expand All @@ -71,7 +72,8 @@ public QueryDescription(
ksqlHostQueryStatus,
@JsonProperty("queryType") final KsqlQueryType queryType,
@JsonProperty("queryErrors") final List<QueryError> queryErrors,
@JsonProperty("tasksMetadata") final Set<StreamsTaskMetadata> tasksMetadata
@JsonProperty("tasksMetadata") final Set<StreamsTaskMetadata> tasksMetadata,
@JsonProperty("consumerGroupId") final String getConsumerGroupId
) {
this.id = Objects.requireNonNull(id, "id");
this.statementText = Objects.requireNonNull(statementText, "statementText");
Expand All @@ -88,12 +90,17 @@ public QueryDescription(
this.queryType = Objects.requireNonNull(queryType, "queryType");
this.queryErrors = new ArrayList<>(Objects.requireNonNull(queryErrors, "queryErrors"));
this.tasksMetadata = new HashSet<>(Objects.requireNonNull(tasksMetadata));
this.consumerGroupId = Objects.requireNonNull(getConsumerGroupId, "consumerGroupId");
}

public QueryId getId() {
return id;
}

public String getConsumerGroupId() {
return consumerGroupId;
}

public String getStatementText() {
return statementText;
}
Expand Down Expand Up @@ -195,7 +202,8 @@ public boolean equals(final Object o) {
&& Objects.equals(ksqlHostQueryStatus, that.ksqlHostQueryStatus)
&& Objects.equals(queryType, that.queryType)
&& Objects.equals(queryErrors, that.queryErrors)
&& Objects.equals(tasksMetadata, that.tasksMetadata);
&& Objects.equals(tasksMetadata, that.tasksMetadata)
&& Objects.equals(consumerGroupId, that.consumerGroupId);
}

@Override
Expand All @@ -213,7 +221,8 @@ public int hashCode() {
ksqlHostQueryStatus,
queryType,
queryErrors,
tasksMetadata
tasksMetadata,
consumerGroupId
);
}
}

0 comments on commit cce585b

Please sign in to comment.