Skip to content

Commit

Permalink
🐞 Fix db config persistence initialization (#6220)
Browse files Browse the repository at this point in the history
Resolves #5954.
  • Loading branch information
tuliren authored Sep 18, 2021
1 parent 51fadf2 commit 111d90f
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 53 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigSchemaMigrationSupport;
import io.airbyte.config.Configs;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.db.Database;
Expand Down Expand Up @@ -73,13 +74,48 @@ public DatabaseConfigPersistence(Database database) {
this.database = new ExceptionWrappingDatabase(database);
}

/**
* Initialize the config persistence.
* <li>If the database has been initialized, update connector definition from YAML seed.</li>
* <li>Otherwise, there are two possibilities.</li>
* <li>The first case is a new deployment, which means there is no local config directory (because
* we no longer create it in newer Airbyte versions). We can initialize the database by copying the
* YAML seed.</li>
* <li>The second case is a migration from an old version that relies on file system config
* persistence. We need to copy the existing configs from local files, and then update connector
* definitions from YAML seed.</li>
*/
public void initialize(Configs serverConfigs, ConfigPersistence yamlSeedPersistence) throws IOException {
database.transaction(ctx -> {
boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS);
if (isInitialized) {
LOGGER.info("Config persistence has been initialized; load YAML seed to update connector definitions");
updateConfigsFromSeed(ctx, yamlSeedPersistence);
return null;
}

boolean hasExistingFileConfigs = FileSystemConfigPersistence.hasExistingConfigs(serverConfigs.getConfigRoot());
if (hasExistingFileConfigs) {
LOGGER.info("Config persistence needs initialization; load seed from existing local config directory, and update from YAML seed");
ConfigPersistence fileSystemPersistence = new FileSystemConfigPersistence(serverConfigs.getConfigRoot());
copyConfigsFromSeed(ctx, fileSystemPersistence);
updateConfigsFromSeed(ctx, yamlSeedPersistence);
return null;
}

LOGGER.info("Config persistence needs initialization; there is no local config directory; load YAML seed");
copyConfigsFromSeed(ctx, yamlSeedPersistence);
return null;
});
}

/**
* Load or update the configs from the seed.
*/
@Override
public void loadData(ConfigPersistence seedConfigPersistence) throws IOException {
database.transaction(ctx -> {
boolean isInitialized = ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where());
boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS);
if (isInitialized) {
updateConfigsFromSeed(ctx, seedConfigPersistence);
} else {
Expand Down Expand Up @@ -239,7 +275,7 @@ int updateConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configTy

@VisibleForTesting
void copyConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence) throws SQLException {
LOGGER.info("Loading data to config database...");
LOGGER.info("Loading seed data to config database...");

Map<String, Stream<JsonNode>> seedConfigs;
try {
Expand Down Expand Up @@ -293,7 +329,7 @@ private ConnectorCounter(int newCount, int updateCount) {

@VisibleForTesting
void updateConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence) throws SQLException {
LOGGER.info("Config database has been initialized; updating connector definitions from the seed if necessary...");
LOGGER.info("Updating connector definitions from the seed if necessary...");

try {
Set<String> connectorRepositoriesInUse = getConnectorRepositoriesInUse(ctx);
Expand Down Expand Up @@ -356,7 +392,7 @@ private <T> ConnectorCounter updateConnectorDefinitions(DSLContext ctx,

ConnectorInfo connectorInfo = connectorRepositoryToIdVersionMap.get(repository);
String latestImageTag = configJson.get("dockerImageTag").asText();
if (!latestImageTag.equals(connectorInfo.dockerImageTag)) {
if (hasNewVersion(connectorInfo.dockerImageTag, latestImageTag)) {
LOGGER.info("Connector {} needs update: {} vs {}", repository, connectorInfo.dockerImageTag, latestImageTag);
updatedCount += updateConfigRecord(ctx, timestamp, configType.name(), configJson, connectorInfo.connectorDefinitionId);
} else {
Expand All @@ -366,6 +402,15 @@ private <T> ConnectorCounter updateConnectorDefinitions(DSLContext ctx,
return new ConnectorCounter(newCount, updatedCount);
}

static boolean hasNewVersion(String currentVersion, String latestVersion) {
try {
return new AirbyteVersion(latestVersion).patchVersionCompareTo(new AirbyteVersion(currentVersion)) > 0;
} catch (Exception e) {
LOGGER.error("Failed to check version: {} vs {}", currentVersion, latestVersion);
return false;
}
}

/**
* @return A map about current connectors (both source and destination). It maps from connector
* repository to its definition id and docker image tag. We identify a connector by its
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.config.persistence;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.Configs;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.jooq.DSLContext;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

/**
* Unit test for the {@link DatabaseConfigPersistence#initialize} method.
*/
public class DatabaseConfigPersistenceInitializeTest extends BaseDatabaseConfigPersistenceTest {

// mock YAML seed connector definitions
private static final List<StandardSourceDefinition> SEED_SOURCES = List.of(SOURCE_GITHUB);
private static final List<StandardDestinationDefinition> SEED_DESTINATIONS = List.of(DESTINATION_SNOWFLAKE);

private static final ConfigPersistence SEED_PERSISTENCE = mock(ConfigPersistence.class);
private static Path ROOT_PATH;

private final Configs configs = mock(Configs.class);

@BeforeAll
public static void setup() throws Exception {
database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
configPersistence = spy(new DatabaseConfigPersistence(database));

when(SEED_PERSISTENCE.dumpConfigs()).thenReturn(Map.of(
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), SEED_SOURCES.stream().map(Jsons::jsonNode),
ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(), SEED_DESTINATIONS.stream().map(Jsons::jsonNode)));
when(SEED_PERSISTENCE.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class))
.thenReturn(SEED_SOURCES);
when(SEED_PERSISTENCE.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class))
.thenReturn(SEED_DESTINATIONS);
}

@AfterAll
public static void tearDown() throws Exception {
database.close();
}

@BeforeEach
public void resetPersistence() throws Exception {
ROOT_PATH = Files.createTempDirectory(
Files.createDirectories(Path.of("/tmp/airbyte_tests")),
DatabaseConfigPersistenceInitializeTest.class.getSimpleName() + UUID.randomUUID());

reset(configs);
when(configs.getConfigRoot()).thenReturn(ROOT_PATH);

database.query(ctx -> ctx.truncateTable("airbyte_configs").execute());

reset(configPersistence);
}

@Test
@DisplayName("When database is not initialized, and there is no local config dir, copy from seed")
public void testNewDeployment() throws Exception {
configPersistence.initialize(configs, SEED_PERSISTENCE);

assertRecordCount(2);
assertHasSource(SOURCE_GITHUB);
assertHasDestination(DESTINATION_SNOWFLAKE);

verify(configPersistence, times(1)).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
verify(configPersistence, never()).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
}

@Test
@DisplayName("When database is not initialized, and there is local config dir, copy from local and update from seed")
public void testMigrationDeployment() throws Exception {
prepareLocalFilePersistence();

configPersistence.initialize(configs, SEED_PERSISTENCE);

assertRecordCount(3);
assertHasSource(SOURCE_GITHUB);
assertHasDestination(DESTINATION_S3);
assertHasDestination(DESTINATION_SNOWFLAKE);

InOrder callOrder = inOrder(configPersistence);
callOrder.verify(configPersistence, times(1)).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
callOrder.verify(configPersistence, times(1)).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
}

@Test
@DisplayName("When database has been initialized, ignore local config dir, and update from seed")
public void testUpdateDeployment() throws Exception {
prepareLocalFilePersistence();
writeSource(configPersistence, SOURCE_GITHUB);

configPersistence.initialize(configs, SEED_PERSISTENCE);

assertRecordCount(2);
assertHasSource(SOURCE_GITHUB);
assertHasDestination(DESTINATION_SNOWFLAKE);

verify(configPersistence, never()).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
verify(configPersistence, times(1)).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
}

private void prepareLocalFilePersistence() throws Exception {
Files.createDirectories(ROOT_PATH.resolve(FileSystemConfigPersistence.CONFIG_DIR));
ConfigPersistence filePersistence = new FileSystemConfigPersistence(ROOT_PATH);
writeSource(filePersistence, SOURCE_GITHUB);
writeDestination(filePersistence, DESTINATION_S3);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
package io.airbyte.config.persistence;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.spy;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -46,8 +48,8 @@
import org.junit.jupiter.api.Test;

/**
* The {@link DatabaseConfigPersistence#loadData} method is tested in
* {@link DatabaseConfigPersistenceLoadDataTest}.
* See {@link DatabaseConfigPersistenceLoadDataTest} and
* {@link DatabaseConfigPersistenceInitializeTest} for testing of those methods.
*/
public class DatabaseConfigPersistenceTest extends BaseDatabaseConfigPersistenceTest {

Expand Down Expand Up @@ -218,4 +220,10 @@ public void testUpdateConfigRecord() throws Exception {
assertHasSource(SOURCE_GITHUB);
}

@Test
public void testHasNewVersion() {
assertTrue(DatabaseConfigPersistence.hasNewVersion("0.1.99", "0.2.0"));
assertFalse(DatabaseConfigPersistence.hasNewVersion("invalid_version", "0.2.0"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.ConfigSeedProvider;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.YamlSeedConfigPersistence;
import io.airbyte.db.Database;
Expand Down Expand Up @@ -182,7 +181,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex
configs.getConfigDatabaseUrl())
.getAndInitialize();
final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase);
configPersistence.loadData(ConfigSeedProvider.get(configs));
configPersistence.initialize(configs, YamlSeedConfigPersistence.get());
final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation());

LOGGER.info("Creating Scheduler persistence...");
Expand Down

0 comments on commit 111d90f

Please sign in to comment.