diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigSeedProvider.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigSeedProvider.java deleted file mode 100644 index 8dfd747b252cb..0000000000000 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigSeedProvider.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.config.persistence; - -import io.airbyte.config.Configs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConfigSeedProvider { - - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigSeedProvider.class); - - public static ConfigPersistence get(Configs configs) { - if (FileSystemConfigPersistence.hasExistingConfigs(configs.getConfigRoot())) { - LOGGER.info("There is existing local config directory; seed from the config volume"); - return new FileSystemConfigPersistence(configs.getConfigRoot()); - } else { - LOGGER.info("There is no existing local config directory; seed from YAML files"); - return YamlSeedConfigPersistence.get(); - } - } - -} diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index 137f3d3d02332..1d8de1f0e9f8e 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -36,6 +36,7 @@ import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; import io.airbyte.config.ConfigSchemaMigrationSupport; +import io.airbyte.config.Configs; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.db.Database; @@ -73,13 +74,48 @@ public DatabaseConfigPersistence(Database database) { this.database = new ExceptionWrappingDatabase(database); } + /** + * Initialize the config persistence. + *
  • If the database has been initialized, update connector definition from YAML seed.
  • + *
  • Otherwise, there are two possibilities.
  • + *
  • The first case is a new deployment, which means there is no local config directory (because + * we no longer create it in newer Airbyte versions). We can initialize the database by copying the + * YAML seed.
  • + *
  • The second case is a migration from an old version that relies on file system config + * persistence. We need to copy the existing configs from local files, and then update connector + * definitions from YAML seed.
  • + */ + public void initialize(Configs serverConfigs, ConfigPersistence yamlSeedPersistence) throws IOException { + database.transaction(ctx -> { + boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS); + if (isInitialized) { + LOGGER.info("Config persistence has been initialized; load YAML seed to update connector definitions"); + updateConfigsFromSeed(ctx, yamlSeedPersistence); + return null; + } + + boolean hasExistingFileConfigs = FileSystemConfigPersistence.hasExistingConfigs(serverConfigs.getConfigRoot()); + if (hasExistingFileConfigs) { + LOGGER.info("Config persistence needs initialization; load seed from existing local config directory, and update from YAML seed"); + ConfigPersistence fileSystemPersistence = new FileSystemConfigPersistence(serverConfigs.getConfigRoot()); + copyConfigsFromSeed(ctx, fileSystemPersistence); + updateConfigsFromSeed(ctx, yamlSeedPersistence); + return null; + } + + LOGGER.info("Config persistence needs initialization; there is no local config directory; load YAML seed"); + copyConfigsFromSeed(ctx, yamlSeedPersistence); + return null; + }); + } + /** * Load or update the configs from the seed. */ @Override public void loadData(ConfigPersistence seedConfigPersistence) throws IOException { database.transaction(ctx -> { - boolean isInitialized = ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where()); + boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS); if (isInitialized) { updateConfigsFromSeed(ctx, seedConfigPersistence); } else { @@ -239,7 +275,7 @@ int updateConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configTy @VisibleForTesting void copyConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence) throws SQLException { - LOGGER.info("Loading data to config database..."); + LOGGER.info("Loading seed data to config database..."); Map> seedConfigs; try { @@ -293,7 +329,7 @@ private ConnectorCounter(int newCount, int updateCount) { @VisibleForTesting void updateConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence) throws SQLException { - LOGGER.info("Config database has been initialized; updating connector definitions from the seed if necessary..."); + LOGGER.info("Updating connector definitions from the seed if necessary..."); try { Set connectorRepositoriesInUse = getConnectorRepositoriesInUse(ctx); @@ -356,7 +392,7 @@ private ConnectorCounter updateConnectorDefinitions(DSLContext ctx, ConnectorInfo connectorInfo = connectorRepositoryToIdVersionMap.get(repository); String latestImageTag = configJson.get("dockerImageTag").asText(); - if (!latestImageTag.equals(connectorInfo.dockerImageTag)) { + if (hasNewVersion(connectorInfo.dockerImageTag, latestImageTag)) { LOGGER.info("Connector {} needs update: {} vs {}", repository, connectorInfo.dockerImageTag, latestImageTag); updatedCount += updateConfigRecord(ctx, timestamp, configType.name(), configJson, connectorInfo.connectorDefinitionId); } else { @@ -366,6 +402,15 @@ private ConnectorCounter updateConnectorDefinitions(DSLContext ctx, return new ConnectorCounter(newCount, updatedCount); } + static boolean hasNewVersion(String currentVersion, String latestVersion) { + try { + return new AirbyteVersion(latestVersion).patchVersionCompareTo(new AirbyteVersion(currentVersion)) > 0; + } catch (Exception e) { + LOGGER.error("Failed to check version: {} vs {}", currentVersion, latestVersion); + return false; + } + } + /** * @return A map about current connectors (both source and destination). It maps from connector * repository to its definition id and docker image tag. We identify a connector by its diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceInitializeTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceInitializeTest.java new file mode 100644 index 0000000000000..c95ffeed72ef6 --- /dev/null +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceInitializeTest.java @@ -0,0 +1,156 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.config.persistence; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.Configs; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.jooq.DSLContext; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +/** + * Unit test for the {@link DatabaseConfigPersistence#initialize} method. + */ +public class DatabaseConfigPersistenceInitializeTest extends BaseDatabaseConfigPersistenceTest { + + // mock YAML seed connector definitions + private static final List SEED_SOURCES = List.of(SOURCE_GITHUB); + private static final List SEED_DESTINATIONS = List.of(DESTINATION_SNOWFLAKE); + + private static final ConfigPersistence SEED_PERSISTENCE = mock(ConfigPersistence.class); + private static Path ROOT_PATH; + + private final Configs configs = mock(Configs.class); + + @BeforeAll + public static void setup() throws Exception { + database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); + configPersistence = spy(new DatabaseConfigPersistence(database)); + + when(SEED_PERSISTENCE.dumpConfigs()).thenReturn(Map.of( + ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), SEED_SOURCES.stream().map(Jsons::jsonNode), + ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(), SEED_DESTINATIONS.stream().map(Jsons::jsonNode))); + when(SEED_PERSISTENCE.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) + .thenReturn(SEED_SOURCES); + when(SEED_PERSISTENCE.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) + .thenReturn(SEED_DESTINATIONS); + } + + @AfterAll + public static void tearDown() throws Exception { + database.close(); + } + + @BeforeEach + public void resetPersistence() throws Exception { + ROOT_PATH = Files.createTempDirectory( + Files.createDirectories(Path.of("/tmp/airbyte_tests")), + DatabaseConfigPersistenceInitializeTest.class.getSimpleName() + UUID.randomUUID()); + + reset(configs); + when(configs.getConfigRoot()).thenReturn(ROOT_PATH); + + database.query(ctx -> ctx.truncateTable("airbyte_configs").execute()); + + reset(configPersistence); + } + + @Test + @DisplayName("When database is not initialized, and there is no local config dir, copy from seed") + public void testNewDeployment() throws Exception { + configPersistence.initialize(configs, SEED_PERSISTENCE); + + assertRecordCount(2); + assertHasSource(SOURCE_GITHUB); + assertHasDestination(DESTINATION_SNOWFLAKE); + + verify(configPersistence, times(1)).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + verify(configPersistence, never()).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + } + + @Test + @DisplayName("When database is not initialized, and there is local config dir, copy from local and update from seed") + public void testMigrationDeployment() throws Exception { + prepareLocalFilePersistence(); + + configPersistence.initialize(configs, SEED_PERSISTENCE); + + assertRecordCount(3); + assertHasSource(SOURCE_GITHUB); + assertHasDestination(DESTINATION_S3); + assertHasDestination(DESTINATION_SNOWFLAKE); + + InOrder callOrder = inOrder(configPersistence); + callOrder.verify(configPersistence, times(1)).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + callOrder.verify(configPersistence, times(1)).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + } + + @Test + @DisplayName("When database has been initialized, ignore local config dir, and update from seed") + public void testUpdateDeployment() throws Exception { + prepareLocalFilePersistence(); + writeSource(configPersistence, SOURCE_GITHUB); + + configPersistence.initialize(configs, SEED_PERSISTENCE); + + assertRecordCount(2); + assertHasSource(SOURCE_GITHUB); + assertHasDestination(DESTINATION_SNOWFLAKE); + + verify(configPersistence, never()).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + verify(configPersistence, times(1)).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class)); + } + + private void prepareLocalFilePersistence() throws Exception { + Files.createDirectories(ROOT_PATH.resolve(FileSystemConfigPersistence.CONFIG_DIR)); + ConfigPersistence filePersistence = new FileSystemConfigPersistence(ROOT_PATH); + writeSource(filePersistence, SOURCE_GITHUB); + writeDestination(filePersistence, DESTINATION_S3); + } + +} diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java index f32a9c3156f1c..d1f7fa3481138 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java @@ -25,7 +25,9 @@ package io.airbyte.config.persistence; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; import com.fasterxml.jackson.databind.JsonNode; @@ -46,8 +48,8 @@ import org.junit.jupiter.api.Test; /** - * The {@link DatabaseConfigPersistence#loadData} method is tested in - * {@link DatabaseConfigPersistenceLoadDataTest}. + * See {@link DatabaseConfigPersistenceLoadDataTest} and + * {@link DatabaseConfigPersistenceInitializeTest} for testing of those methods. */ public class DatabaseConfigPersistenceTest extends BaseDatabaseConfigPersistenceTest { @@ -218,4 +220,10 @@ public void testUpdateConfigRecord() throws Exception { assertHasSource(SOURCE_GITHUB); } + @Test + public void testHasNewVersion() { + assertTrue(DatabaseConfigPersistence.hasNewVersion("0.1.99", "0.2.0")); + assertFalse(DatabaseConfigPersistence.hasNewVersion("invalid_version", "0.2.0")); + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 997e872103653..3f1f42543359f 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -34,7 +34,6 @@ import io.airbyte.config.StandardWorkspace; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.ConfigSeedProvider; import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.db.Database; @@ -182,7 +181,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex configs.getConfigDatabaseUrl()) .getAndInitialize(); final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase); - configPersistence.loadData(ConfigSeedProvider.get(configs)); + configPersistence.initialize(configs, YamlSeedConfigPersistence.get()); final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation()); LOGGER.info("Creating Scheduler persistence...");