diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java index 2cb38e37c11e..81e6ea805b0c 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java @@ -16,11 +16,12 @@ package io.confluent.ksql.tools.migrations.commands; import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getAllMigrations; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion; import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile; import com.github.rvesse.airline.annotations.Command; import com.github.rvesse.airline.annotations.Option; -import com.github.rvesse.airline.annotations.restrictions.MutuallyExclusiveWith; +import com.github.rvesse.airline.annotations.restrictions.RequireOnlyOne; import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.tools.migrations.Migration; @@ -34,7 +35,9 @@ import io.confluent.ksql.util.RetryUtil; import java.time.Clock; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; @@ -56,7 +59,7 @@ public class ApplyMigrationCommand extends BaseCommand { name = {"-a", "--all"}, description = "run all available migrations" ) - @MutuallyExclusiveWith(tag = "target") + @RequireOnlyOne(tag = "target") private boolean all; @Option( @@ -64,16 +67,25 @@ public class ApplyMigrationCommand extends BaseCommand { name = {"-n", "--next"}, description = "migrate the next available version" ) - @MutuallyExclusiveWith(tag = "target") + @RequireOnlyOne(tag = "target") private boolean next; @Option( - title = "version", + title = "untilVersion", name = {"-u", "--until"}, arity = 1, description = "migrate until the specified version" ) - @MutuallyExclusiveWith(tag = "target") + @RequireOnlyOne(tag = "target") + private int untilVersion; + + @Option( + title = "untilVersion", + name = {"-v", "--version"}, + arity = 1, + description = "apply the migration with the specified version" + ) + @RequireOnlyOne(tag = "target") private int version; @Override @@ -105,8 +117,12 @@ int command( final String migrationsDir, final Clock clock ) { + if (untilVersion < 0) { + LOGGER.error("'until' migration version must be positive. Got: {}", untilVersion); + return 1; + } if (version < 0) { - LOGGER.error("Optional migration version must be positive. Got: {}", version); + LOGGER.error("migration version to apply must be positive. Got: {}", version); return 1; } @@ -125,7 +141,7 @@ int command( boolean success; try { - success = ValidateMigrationsCommand.validate(config, migrationsDir, ksqlClient) + success = validateCurrentState(config, ksqlClient, migrationsDir) && apply(config, ksqlClient, migrationsDir, clock); } catch (MigrationException e) { LOGGER.error(e.getMessage()); @@ -151,15 +167,7 @@ private boolean apply( LOGGER.info("Loading migration files"); final List migrations; try { - migrations = getAllMigrations(migrationsDir).stream() - .filter(migration -> { - if (version > 0) { - return migration.getVersion() <= version && migration.getVersion() >= minimumVersion; - } else { - return migration.getVersion() >= minimumVersion; - } - }) - .collect(Collectors.toList()); + migrations = loadMigrationsToApply(migrationsDir, minimumVersion); } catch (MigrationException e) { LOGGER.error(e.getMessage()); return false; @@ -168,7 +176,7 @@ private boolean apply( if (migrations.size() == 0) { LOGGER.info("No eligible migrations found."); } else { - LOGGER.info(migrations.size() + " migration files loaded."); + LOGGER.info(migrations.size() + " migration file(s) loaded."); } for (Migration migration : migrations) { @@ -177,9 +185,46 @@ private boolean apply( } previous = Integer.toString(migration.getVersion()); } + return true; } + private List loadMigrationsToApply( + final String migrationsDir, + final int minimumVersion + ) { + if (version > 0) { + final Optional migration = + getMigrationForVersion(String.valueOf(version), migrationsDir); + if (!migration.isPresent()) { + throw new MigrationException("No migration file with version " + version + " exists."); + } + return Collections.singletonList(migration.get()); + } + + final List migrations = getAllMigrations(migrationsDir).stream() + .filter(migration -> { + if (migration.getVersion() < minimumVersion) { + return false; + } + if (untilVersion > 0) { + return migration.getVersion() <= untilVersion; + } else { + return true; + } + }) + .collect(Collectors.toList()); + + if (next) { + if (migrations.size() == 0) { + throw new MigrationException("No eligible migrations found."); + } + return Collections.singletonList(migrations.get(0)); + } + + return migrations; + } + private boolean applyMigration( final MigrationConfig config, final Client ksqlClient, @@ -187,12 +232,13 @@ private boolean applyMigration( final Clock clock, final String previous ) { - LOGGER.info("Applying " + migration.getName() + " version " + migration.getVersion()); + LOGGER.info("Applying migration version {}: {}", migration.getVersion(), migration.getName()); final String migrationFileContent = MigrationsDirectoryUtil.getFileContentsForName(migration.getFilepath()); - LOGGER.info(migrationFileContent); + LOGGER.info("Migration file contents:\n{}", migrationFileContent); if (dryRun) { + LOGGER.info("Dry run complete. No migrations were actually applied."); return true; } @@ -307,4 +353,13 @@ private boolean updateState( protected Logger getLogger() { return LOGGER; } + + private static boolean validateCurrentState( + final MigrationConfig config, + final Client ksqlClient, + final String migrationsDir + ) { + LOGGER.info("Validating current migration state before applying new migrations"); + return ValidateMigrationsCommand.validate(config, migrationsDir, ksqlClient); + } } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CreateMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CreateMigrationCommand.java index d39f1cbfea8c..ba1bd76e2187 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CreateMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CreateMigrationCommand.java @@ -16,8 +16,8 @@ package io.confluent.ksql.tools.migrations.commands; import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getAllVersions; -import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePathForVersion; import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePrefixForVersion; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion; import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile; import com.github.rvesse.airline.annotations.Arguments; @@ -26,6 +26,7 @@ import com.github.rvesse.airline.annotations.help.Examples; import com.github.rvesse.airline.annotations.restrictions.Required; import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.tools.migrations.Migration; import io.confluent.ksql.tools.migrations.MigrationException; import java.io.File; import java.io.IOException; @@ -103,17 +104,17 @@ private boolean validateVersionDoesNotAlreadyExist(final String migrationsDir) { return true; } - final Optional existingFile; + final Optional existingMigration; try { - existingFile = getFilePathForVersion(String.valueOf(version), migrationsDir); + existingMigration = getMigrationForVersion(String.valueOf(version), migrationsDir); } catch (MigrationException e) { LOGGER.error(e.getMessage()); return false; } - if (existingFile.isPresent()) { + if (existingMigration.isPresent()) { LOGGER.error("Found existing migrations file for version {}: {}", - version, existingFile.get()); + version, existingMigration.get().getFilepath()); return false; } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java index 39d39727b673..fda0bcc27b9a 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java @@ -19,7 +19,7 @@ import static io.confluent.ksql.tools.migrations.util.MetadataUtil.getLatestMigratedVersion; import static io.confluent.ksql.tools.migrations.util.MetadataUtil.validateVersionIsMigrated; import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.computeHashForFile; -import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePathForVersion; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationForVersion; import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile; import com.github.rvesse.airline.annotations.Command; @@ -134,7 +134,7 @@ static boolean validate( final String filename; try { - filename = getFilePathForVersion(version, migrationsDir).get(); + filename = getMigrationForVersion(version, migrationsDir).get().getFilepath(); } catch (MigrationException | NoSuchElementException e) { LOGGER.error("No migrations file found for version with status {}. Version: {}", MigrationState.MIGRATED, version); diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java index ce31cced46d1..399750c22735 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java @@ -25,7 +25,6 @@ import java.nio.file.Paths; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -61,7 +60,7 @@ public static String getFilePrefixForVersion(final String version) { return "V" + StringUtils.leftPad(version, 6, "0"); } - public static Optional getFilePathForVersion( + public static Optional getMigrationForVersion( final String version, final String migrationsDir ) { @@ -77,16 +76,16 @@ public static Optional getFilePathForVersion( throw new MigrationException("Failed to retrieve files from " + migrationsDir); } - final List matches = Arrays.stream(names) + final List matches = Arrays.stream(names) .filter(name -> name.startsWith(prefix)) + .map(name -> getMigrationFromFilename(migrationsDir, name)) + .filter(Optional::isPresent) + .map(Optional::get) .collect(Collectors.toList()); - if (matches.size() == 1) { - return Optional.of(Paths.get(migrationsDir, matches.get(0)).toString()); - } else if (matches.size() == 0) { - return Optional.empty(); - } else { - throw new MigrationException("Found multiple migration files for version " + version); - } + // throw on multiple matches + validateMigrationVersionsUnique(matches); + + return matches.size() > 0 ? Optional.of(matches.get(0)) : Optional.empty(); } public static String getFileContentsForName(final String filename) { @@ -141,30 +140,59 @@ public static List getAllMigrations(final String migrationsDir) { .filter(name -> !new File(name).isDirectory()) .collect(Collectors.toList()); - final List migrations = new ArrayList<>(); - for (final String filename : filenames) { - final Matcher matcher = MIGRATION_FILE_MATCHER.matcher(filename); - if (!matcher.find()) { - LOGGER.warn("Skipping file does not match expected migration file pattern " - + "'V__.sql': {}", filename); - continue; - } + final List migrations = filenames.stream() + .map(name -> getMigrationFromFilename(migrationsDir, name)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); - final int version = Integer.parseInt(matcher.group(1)); - if (version <= 0) { - throw new MigrationException( - "Migration file versions must be positive. Found: " + filename); - } + validateMigrationVersionsUnique(migrations); - final String description = matcher.group(2).replace('_', ' '); + return migrations; + } - migrations.add(new Migration( - version, - description, - migrationsDir + "/" + filename - )); + private static Optional getMigrationFromFilename( + final String migrationsDir, + final String filename + ) { + final Matcher matcher = MIGRATION_FILE_MATCHER.matcher(filename); + if (!matcher.find()) { + LOGGER.warn("Skipping file does not match expected migration file pattern " + + "'V__.sql': {}", filename); + return Optional.empty(); } - return migrations; + final int version = Integer.parseInt(matcher.group(1)); + if (version <= 0) { + throw new MigrationException( + "Migration file versions must be positive. Found: " + filename); + } + + final String description = matcher.group(2).replace('_', ' '); + + return Optional.of(new Migration( + version, + description, + Paths.get(migrationsDir, filename).toString() + )); + } + + private static void validateMigrationVersionsUnique(final List migrations) { + if (migrations.size() == 0) { + return; + } + + Migration previous = migrations.get(0); + for (int i = 1; i < migrations.size(); i++) { + if (migrations.get(i).getVersion() == previous.getVersion()) { + throw new MigrationException(String.format( + "Found multiple migration files with the same version. Version: %d. Files: '%s', '%s'", + migrations.get(i).getVersion(), + migrations.get(i).getFilepath(), + previous.getFilepath() + )); + } + previous = migrations.get(i); + } } } diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java index 665056c77d88..cf524274afb0 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java @@ -114,7 +114,7 @@ public void testApply() throws IOException { () -> makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';").size(), is(1) ); - final int status = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply").run(); + final int status = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a").run(); assertThat(status, is(0)); // verify FOO and BAR were registered diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java index 56e1e25f68a9..05cefa38c688 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java @@ -119,6 +119,8 @@ public void shouldApplyFirstMigration() throws Exception { // Given: command = PARSER.parse("-n"); createMigrationFile(1, NAME, migrationsDir, COMMAND); + // extra migration to ensure only the first is applied + createMigrationFile(3, NAME, migrationsDir, COMMAND); when(versionQueryResult.get()).thenReturn(ImmutableList.of()); // When: @@ -182,6 +184,8 @@ public void shouldApplyUntilVersion() throws Exception { command = PARSER.parse("-u", "2"); createMigrationFile(1, NAME, migrationsDir, COMMAND); createMigrationFile(2, NAME, migrationsDir, COMMAND); + // extra migration to ensure only the first two are applied + createMigrationFile(3, NAME, migrationsDir, COMMAND); when(versionQueryResult.get()).thenReturn(ImmutableList.of()); when(infoQueryResult.get()).thenReturn(ImmutableList.of(createInfoRow(1, NAME, MigrationState.MIGRATED))); @@ -198,6 +202,27 @@ public void shouldApplyUntilVersion() throws Exception { inOrder.verifyNoMoreInteractions(); } + @Test + public void shouldApplySpecificMigration() throws Exception { + // Given: + command = PARSER.parse("-v", "3"); + createMigrationFile(1, NAME, migrationsDir, COMMAND); + createMigrationFile(3, NAME, migrationsDir, COMMAND); + when(versionQueryResult.get()).thenReturn(ImmutableList.of(createVersionRow("1"))); + when(infoQueryResult.get()).thenReturn(ImmutableList.of(createInfoRow(1, NAME, MigrationState.MIGRATED))); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + Instant.ofEpochMilli(1000), ZoneId.systemDefault())); + + // Then: + assertThat(result, is(0)); + final InOrder inOrder = inOrder(ksqlClient); + verifyMigratedVersion(inOrder, 3, "1", MigrationState.MIGRATED); + inOrder.verify(ksqlClient).close(); + inOrder.verifyNoMoreInteractions(); + } + @Test public void shouldNotApplyMigrationIfPreviousNotFinished() throws Exception { // Given: @@ -286,7 +311,6 @@ public void shouldFailIfMetadataNotInitialized() throws Exception { // Given: command = PARSER.parse("-n"); createMigrationFile(1, NAME, migrationsDir, COMMAND); - when(versionQueryResult.get()).thenReturn(ImmutableList.of()); when(sourceDescriptionCf.get()) .thenThrow(new ExecutionException("Source not found", new RuntimeException())); diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java index ce6f9b0ce765..4cde89586474 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java @@ -343,7 +343,7 @@ private void verifyClientCallsForVersions( private String filePathForVersion(final String version) { final String prefix = getFilePrefixForVersion(version); - return Paths.get(migrationsDir, prefix + "_awesome_migration").toString(); + return Paths.get(migrationsDir, prefix + "__awesome_migration.sql").toString(); } private static String fileContentsForVersion(final String version) {