Skip to content

Commit

Permalink
feat: CLI should fail for unsupported server version (#7097)
Browse files Browse the repository at this point in the history
The ksqlDB CLI requires server version 6.0.0 and we should fail early with a clear error message if an older server version is detected.
  • Loading branch information
mjsax authored Mar 11, 2021
1 parent cc5cd81 commit a0745b9
Show file tree
Hide file tree
Showing 6 changed files with 439 additions and 63 deletions.
57 changes: 51 additions & 6 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.confluent.ksql.rest.client.KsqlUnsupportedServerException;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.StreamPublisher;
import io.confluent.ksql.rest.entity.CommandStatus;
Expand All @@ -53,6 +54,8 @@
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMap2;
import io.confluent.ksql.util.HandlerMaps.Handler2;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlVersion;
import io.confluent.ksql.util.KsqlVersion.VersionType;
import io.confluent.ksql.util.ParserUtil;
import io.confluent.ksql.util.WelcomeMsgUtils;
import io.vertx.core.Context;
Expand Down Expand Up @@ -81,11 +84,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
public class Cli implements KsqlRequestExecutor, Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);

private static final int MAX_RETRIES = 10;
private static final String UNKNOWN_VERSION = "<unknown>";
private static final String NO_WARNING = "";

private static final KsqlParser KSQL_PARSER = new DefaultKsqlParser();

Expand Down Expand Up @@ -282,13 +288,15 @@ private void displayWelcomeMessage() {
final ServerInfo serverInfo = restClient.getServerInfo().getResponse();
serverVersion = serverInfo.getVersion();
serverStatus = serverInfo.getServerStatus() == null
? "<unknown>" : serverInfo.getServerStatus();
? UNKNOWN_VERSION : serverInfo.getServerStatus();
} catch (final Exception exception) {
serverVersion = "<unknown>";
serverStatus = "<unknown>";
serverVersion = UNKNOWN_VERSION;
serverStatus = UNKNOWN_VERSION;
}
final String cliVersion = AppInfo.getVersion();

final String versionMismatchWarning = checkServerCompatibility(cliVersion, serverVersion);

final String helpReminderMessage =
"Having trouble? "
+ "Type 'help' (case-insensitive) for a rundown of how things work!";
Expand All @@ -305,10 +313,11 @@ private void displayWelcomeMessage() {
WelcomeMsgUtils.displayWelcomeMessage(consoleWidth, writer);

writer.printf(
"CLI v%s, Server v%s located at %s%n",
"CLI v%s, Server v%s located at %s%n%s",
cliVersion,
serverVersion,
restClient.getServerAddress()
restClient.getServerAddress(),
versionMismatchWarning
);
writer.println("Server Status: " + serverStatus);
writer.println();
Expand All @@ -317,6 +326,42 @@ private void displayWelcomeMessage() {
terminal.flush();
}

private static String checkServerCompatibility(
final String cliVersionNumber,
final String serverVersionNumber) {

final KsqlVersion cliVersion;
try {
cliVersion = new KsqlVersion(cliVersionNumber);
} catch (final IllegalArgumentException exception) {
return "\nWARNING: Could not identify CLI version.\n"
+ " Non-matching CLI and server versions may lead to unexpected errors.\n\n";
}

final KsqlVersion serverVersion;
try {
serverVersion = new KsqlVersion(serverVersionNumber);
} catch (final IllegalArgumentException exception) {
return "\nWARNING: Could not identify server version.\n"
+ " Non-matching CLI and server versions may lead to unexpected errors.\n\n";
}

if (!serverVersion.isAtLeast(new KsqlVersion("6.0."))) {
throw new KsqlUnsupportedServerException(
serverVersion.type() == VersionType.CONFLUENT_PLATFORM ? "6.0.0" : "0.10.0",
cliVersionNumber,
serverVersionNumber
);
}

if (!cliVersion.same(serverVersion)) {
return "\nWARNING: CLI and server version don't match. This may lead to unexpected errors.\n"
+ " It is recommended to use a CLI that matches the server version.\n\n";
}

return NO_WARNING;
}

@Override
public void close() {
terminal.close();
Expand Down Expand Up @@ -364,7 +409,7 @@ private boolean isVariableSubstitutionEnabled() {
final Object substitutionEnabled
= restClient.getProperty(KsqlConfig.KSQL_VARIABLE_SUBSTITUTION_ENABLE);

if (substitutionEnabled != null && substitutionEnabled instanceof Boolean) {
if (substitutionEnabled instanceof Boolean) {
return (boolean) substitutionEnabled;
}

Expand Down
84 changes: 82 additions & 2 deletions ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -57,6 +58,7 @@
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.confluent.ksql.rest.client.KsqlUnsupportedServerException;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandStatus;
Expand All @@ -73,6 +75,7 @@
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.test.util.KsqlIdentifierTestUtil;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.ItemDataProvider;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
Expand Down Expand Up @@ -906,6 +909,79 @@ public void shouldPrintErrorOnUnsupportedAPI() throws Exception {
containsString("Minimum supported client version: 1.0"));
}

@Test
public void shouldFailOnUnsupportedCpServerVersion() throws Exception {
givenRunInteractivelyWillExit();

final KsqlRestClient mockRestClient = givenMockRestClient("5.5.0-0");

assertThrows(
KsqlUnsupportedServerException.class,
() -> new Cli(1L, 1L, mockRestClient, console)
.runInteractively()
);
}

@Test
public void shouldFailOnUnsupportedStandaloneServerVersion() throws Exception {
givenRunInteractivelyWillExit();

final KsqlRestClient mockRestClient = givenMockRestClient("0.9.0-0");

assertThrows(
KsqlUnsupportedServerException.class,
() -> new Cli(1L, 1L, mockRestClient, console)
.runInteractively()
);
}

@Test
public void shouldPrintWarningOnDifferentCpServerVersion() throws Exception {
givenRunInteractivelyWillExit();

final KsqlRestClient mockRestClient = givenMockRestClient("6.0.0-0");

new Cli(1L, 1L, mockRestClient, console)
.runInteractively();

assertThat(
terminal.getOutputString(),
containsString("WARNING: CLI and server version don't match."
+ " This may lead to unexpected errors.")
);
}

@Test
public void shouldPrintWarningOnDifferentStandaloneServerVersion() throws Exception {
givenRunInteractivelyWillExit();

final KsqlRestClient mockRestClient = givenMockRestClient("0.10.0-0");

new Cli(1L, 1L, mockRestClient, console)
.runInteractively();

assertThat(
terminal.getOutputString(),
containsString("WARNING: CLI and server version don't match."
+ " This may lead to unexpected errors.")
);
}

@Test
public void shouldPrintWarningOnUnknownServerVersion() throws Exception {
givenRunInteractivelyWillExit();

final KsqlRestClient mockRestClient = givenMockRestClient("bad-version");

new Cli(1L, 1L, mockRestClient, console)
.runInteractively();

assertThat(
terminal.getOutputString(),
containsString("WARNING: Could not identify server version.")
);
}

@Test
public void shouldListFunctions() {
assertRunListCommand("functions", hasRows(
Expand Down Expand Up @@ -1269,11 +1345,15 @@ private void givenRunInteractivelyWillExit() {
}

private KsqlRestClient givenMockRestClient() throws Exception {
return givenMockRestClient("100.0.0-0");
}

private KsqlRestClient givenMockRestClient(final String serverVersion) throws Exception {
final KsqlRestClient mockRestClient = mock(KsqlRestClient.class);

when(mockRestClient.getServerInfo()).thenReturn(RestResponse.successful(
OK.code(),
new ServerInfo("1.x", "testClusterId", "testServiceId", "status")
new ServerInfo(serverVersion, "testClusterId", "testServiceId", "status")
));

when(mockRestClient.getServerAddress()).thenReturn(new URI("http://someserver:8008"));
Expand Down Expand Up @@ -1323,7 +1403,7 @@ private void assertLastCommandSequenceNumber(
verify(mockRestClient).makeKsqlRequest(statementText, seqNum);
}

@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressWarnings("unchecked")
private static Matcher<String>[] prependWithKey(final String key, final List<?> values) {

final Matcher<String>[] allMatchers = new Matcher[values.size() + 1];
Expand Down
Loading

0 comments on commit a0745b9

Please sign in to comment.