Skip to content

Commit

Permalink
Update connector definitions upon server start (#5723)
Browse files Browse the repository at this point in the history
* Update connector definitions

* Update javadoc

* Add unit tests and fix bugs

* Refactor unit tests

* Fix insertion count

* Format code

* Address review comments
  • Loading branch information
tuliren authored Sep 1, 2021
1 parent 0d7cdcb commit 1f7edaa
Show file tree
Hide file tree
Showing 4 changed files with 402 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@

import static io.airbyte.db.instance.configs.jooq.Tables.AIRBYTE_CONFIGS;
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.select;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigSchemaMigrationSupport;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.validation.json.JsonValidationException;
Expand All @@ -41,13 +46,17 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.Result;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -62,35 +71,16 @@ public DatabaseConfigPersistence(Database database) {
}

/**
* Populate the {@code airbyte_configs} table with configs from the seed persistence. Only do so if
* the table is empty. Otherwise, we assume that it has been populated.
* Load or update the configs from the seed.
*/
public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistence) throws IOException {
database.transaction(ctx -> {
boolean isInitialized = ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where());
if (isInitialized) {
LOGGER.info("Config database is not empty; skipping config seeding and copying");
return null;
}

LOGGER.info("Loading data to config database...");
Map<String, Stream<JsonNode>> seedConfigs;
try {
seedConfigs = seedConfigPersistence.dumpConfigs();
} catch (IOException e) {
throw new SQLException(e);
updateConfigsFromSeed(ctx, seedConfigPersistence);
} else {
copyConfigsFromSeed(ctx, seedConfigPersistence);
}
OffsetDateTime timestamp = OffsetDateTime.now();

int insertionCount = seedConfigs.entrySet().stream().map(entry -> {
String configType = entry.getKey();
return entry.getValue().map(configJson -> {
String idFieldName = ConfigSchemaMigrationSupport.CONFIG_SCHEMA_ID_FIELD_NAMES.get(configType);
return insertConfigRecord(ctx, timestamp, configType, configJson, idFieldName);
}).reduce(0, Integer::sum);
}).reduce(0, Integer::sum);

LOGGER.info("Config database data loading completed with {} records", insertionCount);
return null;
});
return this;
Expand Down Expand Up @@ -208,10 +198,24 @@ public <T> void replaceAllConfigs(Map<AirbyteConfig, Stream<T>> configs, boolean
LOGGER.info("Config database is reset with {} records", insertionCount);
}

@Override
public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
LOGGER.info("Exporting all configs...");

Map<String, Result<Record>> results = database.query(ctx -> ctx.select(asterisk())
.from(AIRBYTE_CONFIGS)
.orderBy(AIRBYTE_CONFIGS.CONFIG_TYPE, AIRBYTE_CONFIGS.CONFIG_ID)
.fetchGroups(AIRBYTE_CONFIGS.CONFIG_TYPE));
return results.entrySet().stream().collect(Collectors.toMap(
Entry::getKey,
e -> e.getValue().stream().map(r -> Jsons.deserialize(r.get(AIRBYTE_CONFIGS.CONFIG_BLOB).data()))));
}

/**
* @return the number of inserted records for convenience, which is always 1.
*/
private int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, String idFieldName) {
@VisibleForTesting
int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, String idFieldName) {
String configId = idFieldName == null
? UUID.randomUUID().toString()
: configJson.get(idFieldName).asText();
Expand All @@ -227,17 +231,175 @@ private int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String
return 1;
}

@Override
public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
LOGGER.info("Exporting all configs...");
/**
* @return the number of updated records.
*/
@VisibleForTesting
int updateConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, String configId) {
LOGGER.info("Updating {} record {}", configType, configId);

Map<String, Result<Record>> results = database.query(ctx -> ctx.select(asterisk())
return ctx.update(AIRBYTE_CONFIGS)
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(configJson)))
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId))
.execute();
}

@VisibleForTesting
void copyConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence) throws SQLException {
LOGGER.info("Loading data to config database...");

Map<String, Stream<JsonNode>> seedConfigs;
try {
seedConfigs = seedConfigPersistence.dumpConfigs();
} catch (IOException e) {
throw new SQLException(e);
}

OffsetDateTime timestamp = OffsetDateTime.now();
int insertionCount = seedConfigs.entrySet().stream().map(entry -> {
String configType = entry.getKey();
return entry.getValue().map(configJson -> {
String idFieldName = ConfigSchemaMigrationSupport.CONFIG_SCHEMA_ID_FIELD_NAMES.get(configType);
return insertConfigRecord(ctx, timestamp, configType, configJson, idFieldName);
}).reduce(0, Integer::sum);
}).reduce(0, Integer::sum);

LOGGER.info("Config database data loading completed with {} records", insertionCount);
}

private static class ConnectorInfo {

private final String connectorDefinitionId;
private final String dockerImageTag;

private ConnectorInfo(String connectorDefinitionId, String dockerImageTag) {
this.connectorDefinitionId = connectorDefinitionId;
this.dockerImageTag = dockerImageTag;
}

}

private static class ConnectorCounter {

private final int newCount;
private final int updateCount;

private ConnectorCounter(int newCount, int updateCount) {
this.newCount = newCount;
this.updateCount = updateCount;
}

}

@VisibleForTesting
void updateConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence) throws SQLException {
LOGGER.info("Config database has been initialized; updating connector definitions from the seed if necessary...");

try {
Set<String> connectorRepositoriesInUse = getConnectorRepositoriesInUse(ctx);
Map<String, ConnectorInfo> connectorRepositoryToInfoMap = getConnectorRepositoryToInfoMap(ctx);

OffsetDateTime timestamp = OffsetDateTime.now();
int newConnectorCount = 0;
int updatedConnectorCount = 0;

List<StandardSourceDefinition> latestSources = seedConfigPersistence.listConfigs(
ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class);
ConnectorCounter sourceConnectorCounter = updateConnectorDefinitions(ctx, timestamp, ConfigSchema.STANDARD_SOURCE_DEFINITION,
latestSources, connectorRepositoriesInUse, connectorRepositoryToInfoMap);
newConnectorCount += sourceConnectorCounter.newCount;
updatedConnectorCount += sourceConnectorCounter.updateCount;

List<StandardDestinationDefinition> latestDestinations = seedConfigPersistence.listConfigs(
ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class);
ConnectorCounter destinationConnectorCounter = updateConnectorDefinitions(ctx, timestamp, ConfigSchema.STANDARD_DESTINATION_DEFINITION,
latestDestinations, connectorRepositoriesInUse, connectorRepositoryToInfoMap);
newConnectorCount += destinationConnectorCounter.newCount;
updatedConnectorCount += destinationConnectorCounter.updateCount;

LOGGER.info("Connector definitions have been updated ({} new connectors, and {} updates)", newConnectorCount, updatedConnectorCount);
} catch (IOException | JsonValidationException e) {
throw new SQLException(e);
}
}

/**
* @param connectorRepositoriesInUse when a connector is used in any standard sync, its definition
* will not be updated. This is necessary because the new connector version may not be
* backward compatible.
*/
private <T> ConnectorCounter updateConnectorDefinitions(DSLContext ctx,
OffsetDateTime timestamp,
AirbyteConfig configType,
List<T> latestDefinitions,
Set<String> connectorRepositoriesInUse,
Map<String, ConnectorInfo> connectorRepositoryToIdVersionMap) {
int newCount = 0;
int updatedCount = 0;
for (T latestDefinition : latestDefinitions) {
JsonNode configJson = Jsons.jsonNode(latestDefinition);
String repository = configJson.get("dockerRepository").asText();
if (connectorRepositoriesInUse.contains(repository)) {
continue;
}

if (!connectorRepositoryToIdVersionMap.containsKey(repository)) {
newCount += insertConfigRecord(ctx, timestamp, configType.name(), configJson, configType.getIdFieldName());
continue;
}

ConnectorInfo connectorInfo = connectorRepositoryToIdVersionMap.get(repository);
String latestImageTag = configJson.get("dockerImageTag").asText();
if (!latestImageTag.equals(connectorInfo.dockerImageTag)) {
updatedCount += updateConfigRecord(ctx, timestamp, configType.name(), configJson, connectorInfo.connectorDefinitionId);
}
}
return new ConnectorCounter(newCount, updatedCount);
}

/**
* @return A map about current connectors (both source and destination). It maps from connector
* repository to its definition id and docker image tag. We identify a connector by its
* repository name instead of definition id because connectors can be added manually by
* users, and are not always the same as those in the seed.
*/
private Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(DSLContext ctx) {
Field<String> repoField = field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR).as("repository");
Field<String> versionField = field("config_blob ->> 'dockerImageTag'", SQLDataType.VARCHAR).as("version");
return ctx.select(AIRBYTE_CONFIGS.CONFIG_ID, repoField, versionField)
.from(AIRBYTE_CONFIGS)
.orderBy(AIRBYTE_CONFIGS.CONFIG_TYPE, AIRBYTE_CONFIGS.CONFIG_ID)
.fetchGroups(AIRBYTE_CONFIGS.CONFIG_TYPE));
return results.entrySet().stream().collect(Collectors.toMap(
Entry::getKey,
e -> e.getValue().stream().map(r -> Jsons.deserialize(r.get(AIRBYTE_CONFIGS.CONFIG_BLOB).data()))));
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.in(ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), ConfigSchema.STANDARD_DESTINATION_DEFINITION.name()))
.fetch().stream()
.collect(Collectors.toMap(
row -> row.getValue(repoField),
row -> new ConnectorInfo(row.getValue(AIRBYTE_CONFIGS.CONFIG_ID), row.getValue(versionField))));
}

/**
* @return A set of connectors (both source and destination) that are already used in standard
* syncs. We identify connectors by its repository name instead of definition id because
* connectors can be added manually by users, and their config ids are not always the same
* as those in the seed.
*/
private Set<String> getConnectorRepositoriesInUse(DSLContext ctx) {
Field<String> sourceIdField = field("config_blob ->> 'sourceId'", SQLDataType.VARCHAR).as("sourceId");
Field<String> destinationIdField = field("config_blob ->> 'destinationId'", SQLDataType.VARCHAR).as("destinationId");
Set<String> usedConfigIds = ctx
.select(sourceIdField, destinationIdField)
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(ConfigSchema.STANDARD_SYNC.name()))
.fetch().stream()
.flatMap(row -> Stream.of(row.getValue(sourceIdField), row.getValue(destinationIdField)))
.collect(Collectors.toSet());

Field<String> repoField = field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR).as("repository");
return ctx.select(repoField)
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_ID.in(usedConfigIds))
.fetch().stream()
.map(Record1::value1)
.collect(Collectors.toSet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,49 @@

package io.airbyte.config.persistence;

import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.count;
import static org.jooq.impl.DSL.table;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.db.Database;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.Record1;
import org.jooq.Result;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.PostgreSQLContainer;

/**
* This class provides downstream tests with constants and helpers.
*/
public abstract class BaseTest {
public abstract class BaseDatabaseConfigPersistenceTest {

protected static PostgreSQLContainer<?> container;
protected static Database database;
protected static DatabaseConfigPersistence configPersistence;

@BeforeAll
public static void dbSetup() {
container = new PostgreSQLContainer<>("postgres:13-alpine")
.withDatabaseName("airbyte")
.withUsername("docker")
.withPassword("docker");
container.start();
}

@AfterAll
public static void dbDown() {
container.close();
}

protected static final StandardSourceDefinition SOURCE_GITHUB;
protected static final StandardSourceDefinition SOURCE_POSTGRES;
Expand Down Expand Up @@ -86,4 +113,21 @@ protected void assertSameConfigDump(Map<String, Stream<JsonNode>> expected, Map<
assertEquals(getMapWithSet(expected), getMapWithSet(actual));
}

protected void assertRecordCount(int expectedCount) throws Exception {
Result<Record1<Integer>> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table("airbyte_configs")).fetch());
assertEquals(expectedCount, recordCount.get(0).value1());
}

protected void assertHasSource(StandardSourceDefinition source) throws Exception {
assertEquals(source, configPersistence
.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(),
StandardSourceDefinition.class));
}

protected void assertHasDestination(StandardDestinationDefinition destination) throws Exception {
assertEquals(destination, configPersistence
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(),
StandardDestinationDefinition.class));
}

}
Loading

0 comments on commit 1f7edaa

Please sign in to comment.