From 5ff94b6dc8c4e101063fb09dcfcfd08c131f3383 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Fri, 6 Dec 2019 14:30:42 -0800 Subject: [PATCH] feat: add connector status to LIST CONNECTORS (#4077) --- .../builder/ConnectorListTableBuilder.java | 5 ++- .../ksql/cli/console/ConsoleTest.java | 17 ++++---- .../execution/ListConnectorsExecutor.java | 39 +++++++++++++++---- .../rest/server/ConnectIntegrationTest.java | 3 ++ .../execution/ListConnectorsExecutorTest.java | 19 ++++++++- .../ksql/rest/entity/SimpleConnectorInfo.java | 15 +++++-- 6 files changed, 76 insertions(+), 22 deletions(-) diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorListTableBuilder.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorListTableBuilder.java index 1fb90cc75f7a..ea988bb31e46 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorListTableBuilder.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorListTableBuilder.java @@ -25,7 +25,7 @@ public class ConnectorListTableBuilder implements TableBuilder { private static final List HEADERS = ImmutableList.of( - "Connector Name", "Type", "Class" + "Connector Name", "Type", "Class", "Status" ); @@ -38,7 +38,8 @@ public Table buildTable(final ConnectorList entity) { .map(info -> ImmutableList.of( info.getName(), ObjectUtils.defaultIfNull(info.getType(), ConnectorType.UNKNOWN).name(), - ObjectUtils.defaultIfNull(info.getClassName(), "")))) + ObjectUtils.defaultIfNull(info.getClassName(), ""), + ObjectUtils.defaultIfNull(info.getState(), "")))) .build(); } } diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index 0847188f4543..14d2feafbd20 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -840,8 +840,8 @@ public void shouldPrintConnectorsList() { "statement", ImmutableList.of(), ImmutableList.of( - new SimpleConnectorInfo("foo", ConnectorType.SOURCE, "clazz"), - new SimpleConnectorInfo("bar", null, null) + new SimpleConnectorInfo("foo", ConnectorType.SOURCE, "clazz", "STATUS"), + new SimpleConnectorInfo("bar", null, null, null) )) )); @@ -859,18 +859,19 @@ public void shouldPrintConnectorsList() { + " \"connectors\" : [ {\n" + " \"name\" : \"foo\",\n" + " \"type\" : \"source\",\n" - + " \"className\" : \"clazz\"\n" + + " \"className\" : \"clazz\",\n" + + " \"state\" : \"STATUS\"\n" + " }, {\n" + " \"name\" : \"bar\"\n" + " } ]\n" + "} ]\n")); } else { assertThat(output, is("\n" - + " Connector Name | Type | Class \n" - + "----------------------------------\n" - + " foo | SOURCE | clazz \n" - + " bar | UNKNOWN | \n" - + "----------------------------------\n")); + + " Connector Name | Type | Class | Status \n" + + "-------------------------------------------\n" + + " foo | SOURCE | clazz | STATUS \n" + + " bar | UNKNOWN | | \n" + + "-------------------------------------------\n")); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java index 6eb2ca7cbe8d..a1162e5d1395 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java @@ -31,8 +31,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.kafka.connect.runtime.AbstractStatus.State; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.AbstractState; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; public final class ListConnectorsExecutor { @@ -58,15 +61,18 @@ public static Optional execute( final List infos = new ArrayList<>(); final List warnings = new ArrayList<>(); final Scope scope = configuredStatement.getStatement().getScope(); + for (final String name : connectors.datum().get()) { final ConnectResponse response = connectClient.describe(name); + if (response.datum().filter(i -> inScope(i.type(), scope)).isPresent()) { - infos.add(fromConnectorInfoResponse(name, response) - ); + final ConnectResponse status = connectClient.status(name); + infos.add(fromConnectorInfoResponse(name, response, status)); } else if (response.error().isPresent()) { if (scope == Scope.ALL) { - infos.add(new SimpleConnectorInfo(name, ConnectorType.UNKNOWN, null)); + infos.add(new SimpleConnectorInfo(name, ConnectorType.UNKNOWN, null, null)); } + warnings.add( new KsqlWarning( String.format( @@ -96,16 +102,35 @@ private static boolean inScope(final ConnectorType type, final Scope scope) { @SuppressWarnings("OptionalGetWithoutIsPresent") private static SimpleConnectorInfo fromConnectorInfoResponse( final String name, - final ConnectResponse response + final ConnectResponse response, + final ConnectResponse status ) { - if (response.error().isPresent()) { - return new SimpleConnectorInfo(name, null, null); + if (response.error().isPresent() || status.error().isPresent()) { + return new SimpleConnectorInfo(name, null, null, status.datum().get().connector().state()); } final ConnectorInfo info = response.datum().get(); return new SimpleConnectorInfo( name, info.type(), - info.config().get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + info.config().get(ConnectorConfig.CONNECTOR_CLASS_CONFIG), + summarizeState(status.datum().get()) + ); + } + + private static String summarizeState(final ConnectorStateInfo connectorState) { + if (!connectorState.connector().state().equals(State.RUNNING.name())) { + return connectorState.connector().state(); + } + + final long numRunningTasks = connectorState.tasks() + .stream() + .map(AbstractState::state) + .filter(State.RUNNING.name()::equals) + .count(); + + return String.format("RUNNING (%s/%s tasks RUNNING)", + numRunningTasks, + connectorState.tasks().size()); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java index aae2446184e6..1d0dcdde312f 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java @@ -130,6 +130,9 @@ public void shouldListConnectors() { assertThat( ((ConnectorList) response.getResponse().get(0)).getConnectors().get(0).getName(), is("mock-connector")); + assertThat( + ((ConnectorList) response.getResponse().get(0)).getConnectors().get(0).getState(), + is("RUNNING (1/1 tasks RUNNING)")); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java index e411208eefee..4891f49906ad 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java @@ -38,6 +38,9 @@ import org.apache.http.HttpStatus; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.TaskState; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.junit.Before; import org.junit.Test; @@ -58,6 +61,16 @@ public class ListConnectorsExecutorTest { ConnectorType.SOURCE ); + private static final ConnectorStateInfo STATUS = new ConnectorStateInfo( + "connector", + new ConnectorState("RUNNING", "foo", "bar"), + ImmutableList.of( + new TaskState(0, "RUNNING", "", ""), + new TaskState(1, "FAILED", "", "") + ), + ConnectorType.SOURCE + ); + @Mock private KsqlExecutionContext engine; @Mock @@ -70,6 +83,8 @@ public void setUp() { when(serviceContext.getConnectClient()).thenReturn(connectClient); when(connectClient.describe("connector")) .thenReturn(ConnectResponse.success(INFO, HttpStatus.SC_OK)); + when(connectClient.status("connector")) + .thenReturn(ConnectResponse.success(STATUS, HttpStatus.SC_OK)); when(connectClient.describe("connector2")) .thenReturn(ConnectResponse.failure("DANGER WILL ROBINSON.", HttpStatus.SC_NOT_FOUND)); } @@ -97,7 +112,7 @@ public void shouldListValidConnector() { "", ImmutableList.of(), ImmutableList.of( - new SimpleConnectorInfo("connector", ConnectorType.SOURCE, CONNECTOR_CLASS) + new SimpleConnectorInfo("connector", ConnectorType.SOURCE, CONNECTOR_CLASS, "RUNNING (1/2 tasks RUNNING)") ) ))); } @@ -153,7 +168,7 @@ public void shouldListInvalidConnectorWithNoInfo() { ImmutableList.of( new KsqlWarning("Could not describe connector connector2: DANGER WILL ROBINSON.")), ImmutableList.of( - new SimpleConnectorInfo("connector2", ConnectorType.UNKNOWN, null) + new SimpleConnectorInfo("connector2", ConnectorType.UNKNOWN, null, null) ) ))); } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SimpleConnectorInfo.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SimpleConnectorInfo.java index bf21b02f691b..f01143740faf 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SimpleConnectorInfo.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SimpleConnectorInfo.java @@ -32,16 +32,19 @@ public class SimpleConnectorInfo { private final String name; private final ConnectorType type; private final String className; + private final String state; @JsonCreator public SimpleConnectorInfo( @JsonProperty("name") final String name, @JsonProperty("type") final ConnectorType type, - @JsonProperty("className") final String className + @JsonProperty("className") final String className, + @JsonProperty("state") final String state ) { this.name = Objects.requireNonNull(name, "name"); this.type = type; this.className = className; + this.state = state; } public String getName() { @@ -56,6 +59,10 @@ public String getClassName() { return className; } + public String getState() { + return state; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -67,12 +74,13 @@ public boolean equals(final Object o) { final SimpleConnectorInfo that = (SimpleConnectorInfo) o; return Objects.equals(name, that.name) && type == that.type - && Objects.equals(className, that.className); + && Objects.equals(className, that.className) + && Objects.equals(state, that.state); } @Override public int hashCode() { - return Objects.hash(name, type, className); + return Objects.hash(name, type, className, state); } @Override @@ -81,6 +89,7 @@ public String toString() { + "name='" + name + '\'' + ", type=" + type + ", className='" + className + '\'' + + ", state=" + state + '}'; } }