Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres config 3605 #3862

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airbyte-config/db-persistence/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
dependencies {
implementation 'commons-io:commons-io:2.7'

implementation project(':airbyte-db')
implementation project(':airbyte-commons')
implementation project(':airbyte-config:models')
implementation project(':airbyte-config:persistence')
implementation project(":airbyte-json-validation")

testImplementation "org.testcontainers:postgresql:1.15.1"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.config.dbPersistence;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DatabaseConfigPersistence implements ConfigPersistence {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this abstraction saving us all that much? i'm not super convinced it's getting us much over just having PostgresConfigPersistence. I guess we are mainly avoiding duplicating the get and list queries. it's probably little enough code that i would be just okay with duplicating.

it's just a guess on my part though, so please feel free to stick with this if you think it's worth it, but i wanted to share my perception.


private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseConfigPersistence.class);

protected final JdbcDatabase database;
private final JsonSchemaValidator jsonSchemaValidator;

protected DatabaseConfigPersistence(JdbcDatabase db, JsonSchemaValidator validator) {
database = db;
jsonSchemaValidator = validator;
}

// Create a table named CONFIG with three columns: CONFIG_ID (PK UUID/string), CONFIG_TYPE (string),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it is idiomatic in PG to use lower cased (snake cased) table names and column names.

// CONFIG_DATA (JSON/string)
public abstract void Setup() throws SQLException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=> setup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this be called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to be called by whatever code initializes the storage system for the first time, and also by the tests of course. That part isn't hooked up to any existing code yet other than the test.


@Override
public <T> T getConfig(ConfigSchema configType, UUID configId, Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException {
try {
Optional<String> data = database.querySingle("SELECT CONFIG_DATA FROM CONFIG WHERE CONFIG_TYPE = ? AND CONFIG_ID = ?",
r -> r.getString("CONFIG_DATA"), configType.toString(), configId);
if (!data.isPresent()) {
throw new ConfigNotFoundException(configType, configId);
}

final T config = Jsons.deserialize(data.get(), clazz);
validateJson(config, configType);
return config;
} catch (SQLException e) {
throw new IOException(String.format("Failed to get config type %s item %s. Reason: %s", configType, configId, e.getMessage()), e);
}
}

@Override
public <T> List<T> listConfigs(ConfigSchema configType, Class<T> clazz) throws JsonValidationException, IOException {
try {
List<T> results = database.query(c -> {
var stmt = c.prepareStatement("SELECT CONFIG_DATA FROM CONFIG WHERE CONFIG_TYPE = ?");
stmt.setString(1, configType.toString());
return stmt;
}, r -> r.getString("CONFIG_DATA"))
.map(s -> (T) Jsons.deserialize(s, clazz))
.collect(Collectors.toList());
return results;
} catch (SQLException e) {
throw new IOException(String.format("Failed to get config type %s listing. Reason: %s", configType, e.getMessage()), e);
}
}

@Override
public <T> void writeConfig(ConfigSchema configType, UUID configId, T config) throws JsonValidationException, IOException {
// validate config with schema
validateJson(Jsons.jsonNode(config), configType);
final String data = Jsons.serialize(config);
try {
database.execute(c -> writeConfigQuery(c, configType, configId, data).execute());
} catch (SQLException e) {
throw new IOException(String.format("Failed to write config type %s item %s. Reason: %s", configType, configId, e.getMessage()), e);
}
}

// Made abstract because what we want for this is an upsert operation, which different databases
// handle with different syntax
// Overrides need to return a prepared statement with all 3 data elements added
protected abstract PreparedStatement writeConfigQuery(Connection conn, ConfigSchema configType, UUID configId, String data) throws SQLException;

private <T> void validateJson(T config, ConfigSchema configType) throws JsonValidationException {
JsonNode schema = JsonSchemaValidator.getSchema(configType.getFile());
jsonSchemaValidator.ensure(schema, Jsons.jsonNode(config));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.config.dbPersistence;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.ConfigSchema;
import io.airbyte.db.Databases;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.UUID;

public class PostgresConfigPersistence extends DatabaseConfigPersistence {

public PostgresConfigPersistence(String username, String password, String connectionString) {
this(username, password, connectionString, new JsonSchemaValidator());
}

public PostgresConfigPersistence(JsonNode config) {
this(config, new JsonSchemaValidator());
}

public PostgresConfigPersistence(JsonNode config, JsonSchemaValidator validator) {
this(config.get("username").asText(), config.get("password").asText(), Databases.getPostgresJdbcUrl(config), validator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i appreciate the attempt to factor out this configuration component here, but I think this probably a place where we want to keep the code paths separate.

here's how i'm thinking about it... the config (a json blob) that we use in the postgres source, postgres destination, and the database that is used in airbyte-core are all different. at least in the case of source and destination, their configuration objects are declared separately.

this is to bringing to light one thing, which is putting these naked JsonNode in interfaces is kinda dangerous... they're almost no better than just doing type Object. this is a bit more tolerable in the connector world because (theoretically) the objects are validated (although as we discussed in your last PR, the tests don't check for it explicitly).

In general where possible we should reduce the use of just JsonNode, and I think in this case the schema of the object is knowable enough that we can do that.

}

public PostgresConfigPersistence(String username, String password, String connectionString, JsonSchemaValidator validator) {
super(Databases.createJdbcDatabase(username, password, connectionString, Databases.POSTGRES_DRIVER), validator);
}

@Override
// Create a table named CONFIG with three columns: CONFIG_ID (PK UUID/string), CONFIG_TYPE (string),
// CONFIG_DATA (JSON/string)
public void Setup() throws SQLException {
database.execute("CREATE TABLE CONFIG (CONFIG_ID UUID PRIMARY KEY, CONFIG_TYPE VARCHAR(32) NOT NULL, CONFIG_DATA JSONB NOT NULL)");
}

@Override
protected PreparedStatement writeConfigQuery(Connection conn, ConfigSchema configType, UUID configId, String data) throws SQLException {
var result = conn.prepareStatement(
"INSERT INTO CONFIG (CONFIG_ID, CONFIG_TYPE, CONFIG_DATA) VALUES (?, ?, CAST(? as jsonb)) ON CONFLICT (CONFIG_ID) DO UPDATE SET CONFIG_DATA = EXCLUDED.CONFIG_DATA");
result.setObject(1, configId);
result.setString(2, configType.toString());
result.setString(3, data);
return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.config.dbPersistence;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;

import com.google.common.collect.Sets;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;

public class PostgresConfigPersistenceTest {

public static final UUID UUID_1 = new UUID(0, 1);
public static final StandardSourceDefinition SOURCE_1 = new StandardSourceDefinition();

static {
SOURCE_1.withSourceDefinitionId(UUID_1)
.withName("apache storm");
}

public static final UUID UUID_2 = new UUID(0, 2);
public static final StandardSourceDefinition SOURCE_2 = new StandardSourceDefinition();

static {
SOURCE_2.withSourceDefinitionId(UUID_2)
.withName("apache storm");
}

private JsonSchemaValidator schemaValidator;

private PostgresConfigPersistence configPersistence;

private static final String IMAGE_NAME = "postgres:13-alpine";
private PostgreSQLContainer<?> db;

@BeforeEach
void setUp() throws SQLException {
schemaValidator = mock(JsonSchemaValidator.class);
db = new PostgreSQLContainer<>(IMAGE_NAME);
db.start();
configPersistence = new PostgresConfigPersistence(db.getUsername(), db.getPassword(), db.getJdbcUrl(), schemaValidator);
configPersistence.Setup();
}

@AfterEach
void tearDown() {
db.stop();
db.close();
}

@Test
void testReadWriteConfig() throws IOException, JsonValidationException, ConfigNotFoundException {
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1, SOURCE_1);

assertEquals(
SOURCE_1,
configPersistence.getConfig(
ConfigSchema.STANDARD_SOURCE_DEFINITION,
UUID_1,
StandardSourceDefinition.class));
}

@Test
void testListConfigs() throws JsonValidationException, IOException {
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1, SOURCE_1);
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_2, SOURCE_2);

assertEquals(
Sets.newHashSet(SOURCE_1, SOURCE_2),
Sets.newHashSet(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)));
}

@Test
void writeConfigWithJsonSchemaRef() throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardSync standardSync = new StandardSync()
.withName("sync")
.withPrefix("sync")
.withConnectionId(UUID_1)
.withSourceId(UUID.randomUUID())
.withDestinationId(UUID.randomUUID())
.withOperationIds(List.of(UUID.randomUUID()));

configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID_1, standardSync);

assertEquals(
standardSync,
configPersistence.getConfig(ConfigSchema.STANDARD_SYNC, UUID_1, StandardSync.class));
}

@Test
void writeConfigInvalidConfig() throws JsonValidationException {
StandardSourceDefinition standardSourceDefinition = SOURCE_1.withName(null);

doThrow(new JsonValidationException("error")).when(schemaValidator).ensure(any(), any());

assertThrows(JsonValidationException.class, () -> configPersistence.writeConfig(
ConfigSchema.STANDARD_SOURCE_DEFINITION,
UUID_1,
standardSourceDefinition));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@
package io.airbyte.config.persistence;

import io.airbyte.config.ConfigSchema;
import java.util.UUID;

public class ConfigNotFoundException extends Exception {

private ConfigSchema type;
private final String configId;

public ConfigNotFoundException(ConfigSchema type, String configId) {
super(String.format("config type: %s id: %s", type, configId));
public ConfigNotFoundException(ConfigSchema type, UUID configId) {
super(String.format("config type: %s id: %s", type, configId.toString()));
this.type = type;
this.configId = configId;
this.configId = configId.toString();
}

public ConfigSchema getType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.UUID;

public interface ConfigPersistence {

<T> T getConfig(ConfigSchema configType, String configId, Class<T> clazz) throws ConfigNotFoundException, JsonValidationException, IOException;
<T> T getConfig(ConfigSchema configType, UUID configId, Class<T> clazz) throws ConfigNotFoundException, JsonValidationException, IOException;

<T> List<T> listConfigs(ConfigSchema configType, Class<T> clazz) throws JsonValidationException, IOException;

<T> void writeConfig(ConfigSchema configType, String configId, T config) throws JsonValidationException, IOException;
<T> void writeConfig(ConfigSchema configType, UUID configId, T config) throws JsonValidationException, IOException;

}
Loading