Skip to content

Commit

Permalink
feat(migrations): enable insert values and create connector for migra…
Browse files Browse the repository at this point in the history
…tions tool (#7161)

* feat(migrations): enable insert values statements for migrations tool

* add create connect

* address review comments: use expression to determine value type

* checkstyle

* add comment support

* checkstyle

* address review comments

* add connect executable

* checkstyle

* remove connectors from integration test

* fix unit test

* spotbugs

* findbugs
  • Loading branch information
Zara Lim authored Mar 10, 2021
1 parent f383800 commit 2c614cd
Show file tree
Hide file tree
Showing 6 changed files with 706 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,19 @@
import com.github.rvesse.airline.annotations.restrictions.RequireOnlyOne;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.FieldInfo;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.CommandParser;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlCommand;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlConnectorStatement;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlInsertValues;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlStatement;
import io.confluent.ksql.tools.migrations.util.MetadataUtil;
import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState;
import io.confluent.ksql.tools.migrations.util.MigrationFile;
Expand All @@ -34,9 +45,10 @@
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.RetryUtil;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
Expand Down Expand Up @@ -116,6 +128,7 @@ protected int command() {
return command(
config,
MigrationsUtil::getKsqlClient,
MigrationsUtil::createRestClient,
getMigrationsDirFromConfigFile(configFile),
Clock.systemDefaultZone()
);
Expand All @@ -126,6 +139,7 @@ protected int command() {
int command(
final MigrationConfig config,
final Function<MigrationConfig, Client> clientSupplier,
final Function<MigrationConfig, KsqlRestClient> restClientSupplier,
final String migrationsDir,
final Clock clock
) {
Expand All @@ -140,8 +154,10 @@ int command(
}

final Client ksqlClient;
final KsqlRestClient restClient;
try {
ksqlClient = clientSupplier.apply(config);
restClient = restClientSupplier.apply(config);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return 1;
Expand All @@ -160,12 +176,13 @@ int command(
boolean success;
try {
success = validateCurrentState(config, ksqlClient, migrationsDir)
&& apply(config, ksqlClient, migrationsDir, clock);
&& apply(config, ksqlClient, restClient, migrationsDir, clock);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
success = false;
} finally {
ksqlClient.close();
restClient.close();
}

return success ? 0 : 1;
Expand All @@ -174,6 +191,7 @@ int command(
private boolean apply(
final MigrationConfig config,
final Client ksqlClient,
final KsqlRestClient restClient,
final String migrationsDir,
final Clock clock
) {
Expand All @@ -198,7 +216,7 @@ private boolean apply(
}

for (MigrationFile migration : migrations) {
if (!applyMigration(config, ksqlClient, migration, clock, previous)) {
if (!applyMigration(config, ksqlClient, restClient, migration, clock, previous)) {
return false;
}
previous = Integer.toString(migration.getVersion());
Expand Down Expand Up @@ -246,6 +264,7 @@ private List<MigrationFile> loadMigrationsToApply(
private boolean applyMigration(
final MigrationConfig config,
final Client ksqlClient,
final KsqlRestClient restClient,
final MigrationFile migration,
final Clock clock,
final String previous
Expand Down Expand Up @@ -274,28 +293,99 @@ private boolean applyMigration(
return false;
}

final List<String> commands = Arrays.stream(migrationFileContent.split(";"))
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(s -> s + ";")
.collect(Collectors.toList());
for (final String command : commands) {
try {
executeCommands(migrationFileContent, ksqlClient, restClient, config,
executionStart, migration, clock, previous);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return false;
}

if (!updateState(config, ksqlClient, MigrationState.MIGRATED,
executionStart, migration, clock, previous, Optional.empty())) {
return false;
}
LOGGER.info("Successfully migrated");
return true;
}

private void executeCommands(
final String migrationFileContent,
final Client ksqlClient,
final KsqlRestClient restClient,
final MigrationConfig config,
final String executionStart,
final MigrationFile migration,
final Clock clock,
final String previous
) {
final List<SqlCommand> commands = CommandParser.parse(migrationFileContent);
for (final SqlCommand command : commands) {
try {
ksqlClient.executeStatement(command).get();
} catch (InterruptedException | ExecutionException e) {
executeCommand(command, ksqlClient, restClient);
} catch (InterruptedException | ExecutionException | MigrationException e) {
final String errorMsg = String.format(
"Failed to execute sql: %s. Error: %s", command, e.getMessage());
LOGGER.error(errorMsg);
"Failed to execute sql: %s. Error: %s", command.getCommand(), e.getMessage());
updateState(config, ksqlClient, MigrationState.ERROR,
executionStart, migration, clock, previous, Optional.of(errorMsg));
return false;
throw new MigrationException(errorMsg);
}
}
}

updateState(config, ksqlClient, MigrationState.MIGRATED,
executionStart, migration, clock, previous, Optional.empty());
LOGGER.info("Successfully migrated");
return true;
private void executeCommand(
final SqlCommand command,
final Client ksqlClient,
final KsqlRestClient restClient
) throws ExecutionException, InterruptedException {
if (command instanceof SqlStatement) {
ksqlClient.executeStatement(command.getCommand()).get();
} else if (command instanceof SqlInsertValues) {
final List<FieldInfo> fields =
ksqlClient.describeSource(((SqlInsertValues) command).getSourceName()).get().fields();
ksqlClient.insertInto(
((SqlInsertValues) command).getSourceName(),
getRow(
fields,
((SqlInsertValues) command).getColumns(),
((SqlInsertValues) command).getValues())).get();
} else if (command instanceof SqlConnectorStatement) {
final RestResponse<KsqlEntityList> respose = restClient.makeKsqlRequest(command.getCommand());
if (!respose.isSuccessful()) {
throw new MigrationException(respose.getErrorMessage().getMessage());
}
}
}

private KsqlObject getRow(
final List<FieldInfo> sourceFields,
final List<String> insertColumns,
final List<Expression> insertValues
) {
final Map<String, Object> row = new HashMap<>();
if (insertColumns.size() > 0) {
verifyColumnValuesMatch(insertColumns, insertValues);
for (int i = 0 ; i < insertColumns.size(); i++) {
row.put(insertColumns.get(i), CommandParser.toFieldType(insertValues.get(i)));
}
} else {
final List<String> columnNames = sourceFields.stream()
.map(FieldInfo::name).collect(Collectors.toList());
verifyColumnValuesMatch(columnNames, insertValues);
for (int i = 0 ; i < sourceFields.size(); i++) {
row.put(sourceFields.get(i).name(), CommandParser.toFieldType(insertValues.get(i)));
}
}

return new KsqlObject(row);
}

private void verifyColumnValuesMatch(final List<String> columns, final List<Expression> values) {
if (columns.size() != values.size()) {
throw new MigrationException(String.format("Invalid `INSERT VALUES` statement. Number of "
+ "columns and values must match. Got: Columns: %d. Values: %d.",
columns.size(), values.size()));
}
}

private boolean verifyMigrated(
Expand Down
Loading

0 comments on commit 2c614cd

Please sign in to comment.