Skip to content

Commit

Permalink
fix: if not exists return type (#8322)
Browse files Browse the repository at this point in the history
* fix: if not exists return type
  • Loading branch information
wcarlson5 authored Nov 5, 2021
1 parent bc3ea64 commit 9da204c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ static void handleExecuteStatementResponse(
final JsonObject ksqlEntity,
final CompletableFuture<ExecuteStatementResult> cf
) {
if (isIfNotExistsWarning(ksqlEntity)) {
cf.complete(new ExecuteStatementResultImpl(Optional.empty()));
return;
}
if (!isCommandStatusEntity(ksqlEntity)) {
handleUnexpectedEntity(ksqlEntity, cf);
return;
Expand Down Expand Up @@ -68,6 +72,13 @@ private static boolean isCommandStatusEntity(final JsonObject ksqlEntity) {
&& ksqlEntity.getJsonObject("commandStatus") != null;
}

private static boolean isIfNotExistsWarning(final JsonObject ksqlEntity) {
return ksqlEntity.getString("statementText") != null
&& ksqlEntity.getString("statementText").contains("IF NOT EXISTS")
&& ksqlEntity.getString("@type") != null
&& ksqlEntity.getString("@type").equals("warning_entity");
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
private static void handleUnexpectedEntity(
final JsonObject ksqlEntity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.exception.KsqlException;
import io.confluent.ksql.api.client.impl.ConnectorTypeImpl;
import io.confluent.ksql.api.client.impl.ExecuteStatementResultImpl;
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
import io.confluent.ksql.api.client.util.ClientTestUtil;
import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber;
Expand Down Expand Up @@ -84,6 +85,7 @@
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.entity.TypeList;
import io.confluent.ksql.rest.entity.WarningEntity;
import io.confluent.ksql.schema.ksql.types.SqlBaseType;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.KsqlConstants.KsqlQueryStatus;
Expand Down Expand Up @@ -789,6 +791,24 @@ public void shouldHandleErrorResponseFromExecuteStatement() {
assertThat(e.getCause().getMessage(), containsString("something bad"));
}

@Test
public void shouldHandleIfNotExistsWarningResponseFromExecuteStatement() throws Exception {
// Given
KsqlEntity ksqlEntity = new WarningEntity(
"CREATE STREAM IF NOT EXISTS ",
"Cannot add stream `HIGH_VALUE_STOCK_TRADES`: A stream with the same name already exists."
);
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(ksqlEntity));


final ExecuteStatementResult result = javaClient.executeStatement("CSAS;").get();


// Then
assertThat(result.queryId(),
equalTo(Optional.empty()));
}

@Test
public void shouldExecuteSingleStatementWithMultipleSemicolons() throws Exception {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.confluent.ksql.api.client.ConnectorDescription;
import io.confluent.ksql.api.client.ConnectorInfo;
import io.confluent.ksql.api.client.ConnectorType;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import io.confluent.ksql.api.client.KsqlArray;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
Expand Down Expand Up @@ -317,6 +318,18 @@ public void shouldStreamMultiplePushQueries() throws Exception {
}
}

@Test
public void shouldOnlyWarnForDuplicateIfNotExists() throws Exception {
// When
ExecuteStatementResult result;
client.executeStatement("CREATE STREAM FOO (id INT KEY, bar VARCHAR) WITH(value_format='json', kafka_topic='foo', partitions=6);").get();
client.executeStatement("CREATE STREAM IF NOT EXISTS BAR AS SELECT * FROM FOO EMIT CHANGES;").get();
result = client.executeStatement("CREATE STREAM IF NOT EXISTS BAR AS SELECT * FROM FOO EMIT CHANGES;").get();

//Then
assertThat(result.queryId(), is(Optional.empty()));
}

@Test
public void shouldStreamPushQueryAsync() throws Exception {
// When
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.entity.HeartbeatMessage;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlMediaType;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.LagReportingMessage;
Expand All @@ -55,7 +56,7 @@ public class TestEndpoints implements Endpoints {

private Supplier<RowGenerator> rowGeneratorFactory;
private TestInsertsSubscriber insertsSubscriber;
private List<KsqlEntity> ksqlEndpointResponse;
private KsqlEntityList ksqlEndpointResponse;
private String lastSql;
private JsonObject lastProperties;
private JsonObject lastSessionVariables;
Expand Down Expand Up @@ -241,7 +242,7 @@ public synchronized TestInsertsSubscriber getInsertsSubscriber() {
}

public synchronized void setKsqlEndpointResponse(final List<KsqlEntity> entities) {
this.ksqlEndpointResponse = ImmutableList.copyOf(entities);
this.ksqlEndpointResponse = new KsqlEntityList(ImmutableList.copyOf(entities));
}

public synchronized String getLastSql() {
Expand Down

0 comments on commit 9da204c

Please sign in to comment.