From bbba17505c1e2c322a65f7d5b06015a3f92a4024 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 31 Aug 2021 20:55:02 +0800 Subject: [PATCH 1/5] Checkpoint: Inital refactor of connector code to support looking at specific schemas and not the entire database. --- .../source/jdbc/AbstractJdbcSource.java | 12 ++++++++---- .../integrations/source/oracle/OracleSource.java | 15 +++++++++++++++ .../source-oracle/src/main/resources/spec.json | 10 ++++++++++ .../relationaldb/AbstractRelationalDbSource.java | 11 +++++++++++ 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 5d785d5d03e7..60d899b87c58 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -117,15 +117,13 @@ private static Map> aggregatePrimateKeys(List>> discoverInternal(final JdbcDatabase database) - throws Exception { + protected List>> discoverInternal(JdbcDatabase database, String schema) throws Exception { final Set internalSchemas = new HashSet<>(getExcludedInternalNameSpaces()); return database.bufferedResultSetQuery( - conn -> conn.getMetaData().getColumns(getCatalog(database), null, null, null), + conn -> conn.getMetaData().getColumns(getCatalog(database), schema, null, null), resultSet -> Jsons.jsonNode(ImmutableMap.builder() // we always want a namespace, if we cannot get a schema, use db name. .put(INTERNAL_SCHEMA_NAME, @@ -165,6 +163,12 @@ public List>> discoverInternal(final JdbcDatabas .collect(Collectors.toList()); } + @Override + public List>> discoverInternal(final JdbcDatabase database) + throws Exception { + return discoverInternal(database, null); + } + @Override protected JsonSchemaPrimitive getType(JDBCType columnType) { return SourceJdbcUtils.getType(columnType); diff --git a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java index 1c84b3f866d1..38c9df240d4a 100644 --- a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java +++ b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java @@ -27,10 +27,15 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.OracleJdbcStreamingQueryConfiguration; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.integrations.source.relationaldb.TableInfo; +import io.airbyte.protocol.models.CommonField; +import java.sql.JDBCType; +import java.util.List; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +66,16 @@ public JsonNode toDatabaseConfig(JsonNode config) { return Jsons.jsonNode(configBuilder.build()); } + @Override + public List>> discoverInternal(JdbcDatabase database) throws Exception { + // if schemas is empty + // use the user's name + // check uniqueness + + // otherwise iterate through all the schemas and return them. + return super.discoverInternal(database); + } + @Override public Set getExcludedInternalNameSpaces() { // need to add SYSTEM too but for that need create another user when creating the container. diff --git a/airbyte-integrations/connectors/source-oracle/src/main/resources/spec.json b/airbyte-integrations/connectors/source-oracle/src/main/resources/spec.json index df65d3c02106..edaa52a3a50e 100644 --- a/airbyte-integrations/connectors/source-oracle/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-oracle/src/main/resources/spec.json @@ -35,6 +35,16 @@ "description": "Password associated with the username.", "type": "string", "airbyte_secret": true + }, + "schemas": { + "title": "Schemas", + "description": "List of schemas to sync from. Defaults to user.", + "type": "array", + "items": { + "type": "string" + }, + "minItems": 1, + "uniqueItems": true } } } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java index 7eff8c76a99c..32503c6f0fd4 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java @@ -131,6 +131,17 @@ public abstract class AbstractRelationalDbSource>> discoverInternal(final Database database) throws Exception; + /** + * Discovers all available tables within a schema in the source database. + * + * @param database - source database + * @param schema - source schema + * @return list of source tables + * @throws Exception - access to the database might lead to exceptions. + */ + protected abstract List>> discoverInternal(final Database database, String schema) + throws Exception; + /** * Discover Primary keys for each table and @return a map of namespace.table name to their * associated list of primary key fields. From c0e71bce1dc61cceb61bfd56f2e17526cbb6257a Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 1 Sep 2021 13:49:55 +0800 Subject: [PATCH 2/5] Add logic to allow user to add a list of schemas. --- .../source/oracle/OracleSource.java | 23 +++++++++++++++++-- .../src/main/resources/spec.json | 2 +- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java index 38c9df240d4a..2099c27c5829 100644 --- a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java +++ b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java @@ -35,7 +35,9 @@ import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.protocol.models.CommonField; import java.sql.JDBCType; +import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +48,8 @@ public class OracleSource extends AbstractJdbcSource implements Source { static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver"; + private List schemas; + public OracleSource() { super(DRIVER_CLASS, new OracleJdbcStreamingQueryConfiguration()); } @@ -63,6 +67,13 @@ public JsonNode toDatabaseConfig(JsonNode config) { configBuilder.put("password", config.get("password").asText()); } + schemas = List.of(config.get("username").asText().toUpperCase(Locale.ROOT)); + if (config.has("schemas") && config.get("schemas").isArray()) { + schemas = new ArrayList<>(); + for (final JsonNode schema : config.get("schemas")) { + schemas.add(schema.asText()); + } + } return Jsons.jsonNode(configBuilder.build()); } @@ -71,9 +82,17 @@ public List>> discoverInternal(JdbcDatabase data // if schemas is empty // use the user's name // check uniqueness + List>> internals = new ArrayList<>(); + for (String schema : schemas) { + LOGGER.debug("Discovering schema: {}", schema); + internals.addAll(super.discoverInternal(database, schema)); + } + + for (TableInfo> info : internals) { + LOGGER.debug("Found table: {}", info.getName()); + } - // otherwise iterate through all the schemas and return them. - return super.discoverInternal(database); + return internals; } @Override diff --git a/airbyte-integrations/connectors/source-oracle/src/main/resources/spec.json b/airbyte-integrations/connectors/source-oracle/src/main/resources/spec.json index edaa52a3a50e..3b5a02ef9604 100644 --- a/airbyte-integrations/connectors/source-oracle/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-oracle/src/main/resources/spec.json @@ -38,7 +38,7 @@ }, "schemas": { "title": "Schemas", - "description": "List of schemas to sync from. Defaults to user.", + "description": "List of schemas to sync from. Defaults to user. Case sensitive.", "type": "array", "items": { "type": "string" From bd740f71c9f8a58532b593afd1730f5868a5213c Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 1 Sep 2021 14:02:59 +0800 Subject: [PATCH 3/5] Update tests. --- .../integrations/source/oracle/OracleSourceAcceptanceTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceAcceptanceTest.java index 5d92027dcb68..789be751dd00 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceAcceptanceTest.java @@ -64,6 +64,7 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception .put("sid", container.getSid()) .put("username", container.getUsername()) .put("password", container.getPassword()) + .put("schemas", List.of("JDBC_SPACE")) .build()); JdbcDatabase database = Databases.createJdbcDatabase(config.get("username").asText(), From f7b606d708652e03164e86f69cf6bd0bf425e16b Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 1 Sep 2021 14:13:11 +0800 Subject: [PATCH 4/5] Update version. Update oracle documentation. --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-integrations/connectors/source-oracle/Dockerfile | 2 +- docs/integrations/sources/oracle.md | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 27ca96c5c09d..c351268d0412 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -289,7 +289,7 @@ - sourceDefinitionId: b39a7370-74c3-45a6-ac3a-380d48520a83 name: Oracle DB dockerRepository: airbyte/source-oracle - dockerImageTag: 0.3.2 + dockerImageTag: 0.3.3 documentationUrl: https://docs.airbyte.io/integrations/sources/oracle - sourceDefinitionId: c8630570-086d-4a40-99ae-ea5b18673071 name: Zendesk Talk diff --git a/airbyte-integrations/connectors/source-oracle/Dockerfile b/airbyte-integrations/connectors/source-oracle/Dockerfile index 7afe0ac499ef..f21d756510ff 100644 --- a/airbyte-integrations/connectors/source-oracle/Dockerfile +++ b/airbyte-integrations/connectors/source-oracle/Dockerfile @@ -9,5 +9,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.2 +LABEL io.airbyte.version=0.3.3 LABEL io.airbyte.name=airbyte/source-oracle diff --git a/docs/integrations/sources/oracle.md b/docs/integrations/sources/oracle.md index d76d5abd9373..e368630dff2e 100644 --- a/docs/integrations/sources/oracle.md +++ b/docs/integrations/sources/oracle.md @@ -96,9 +96,14 @@ GRANT SELECT ON ""."" TO airbyte; Your database user should now be ready for use with Airbyte. +#### 3. Include the schemas Airbyte should look at when configuring the Airbyte Oracle Source. + +Case sensitive. Defaults to the upper-cased user if empty. If the user does not have access to the configured schemas, no tables will be discovered. + ## Changelog | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | -| 0.3.2 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator | \ No newline at end of file +| 0.3.3 | 2021-09-01 | [5779](https://github.com/airbytehq/airbyte/pull/5779) | Ability to only discover certain schemas. | +| 0.3.2 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator. | From 5c8aac2f88596c90c23588e35d4f6d8cf31dc197 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 1 Sep 2021 16:45:18 +0800 Subject: [PATCH 5/5] Update all tests. --- .../integrations/source/oracle/OracleSource.java | 5 ++--- .../source/oracle/OracleSourceDatatypeTest.java | 2 ++ .../oracle/OracleJdbcSourceAcceptanceTest.java | 2 ++ .../source/oracle/OracleSourceTest.java | 14 ++------------ 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java index 2099c27c5829..b145f81d7884 100644 --- a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java +++ b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java @@ -67,6 +67,7 @@ public JsonNode toDatabaseConfig(JsonNode config) { configBuilder.put("password", config.get("password").asText()); } + // Use the upper-cased username by default. schemas = List.of(config.get("username").asText().toUpperCase(Locale.ROOT)); if (config.has("schemas") && config.get("schemas").isArray()) { schemas = new ArrayList<>(); @@ -74,14 +75,12 @@ public JsonNode toDatabaseConfig(JsonNode config) { schemas.add(schema.asText()); } } + return Jsons.jsonNode(configBuilder.build()); } @Override public List>> discoverInternal(JdbcDatabase database) throws Exception { - // if schemas is empty - // use the user's name - // check uniqueness List>> internals = new ArrayList<>(); for (String schema : schemas) { LOGGER.debug("Discovering schema: {}", schema); diff --git a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceDatatypeTest.java b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceDatatypeTest.java index 2c366ee3a3ba..666aef872c9f 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceDatatypeTest.java @@ -39,6 +39,7 @@ import java.time.Instant; import java.util.Calendar; import java.util.Date; +import java.util.List; import java.util.TimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,7 @@ protected Database setupDatabase() throws Exception { .put("sid", container.getSid()) .put("username", container.getUsername()) .put("password", container.getPassword()) + .put("schemas", List.of("TEST")) .build()); Database database = Databases.createOracleDatabase(config.get("username").asText(), diff --git a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java index c340d13103ed..7bd223e53e8a 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java @@ -37,6 +37,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.List; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -88,6 +89,7 @@ public void setup() throws Exception { .put("sid", ORACLE_DB.getSid()) .put("username", ORACLE_DB.getUsername()) .put("password", ORACLE_DB.getPassword()) + .put("schemas", List.of(SCHEMA_NAME, SCHEMA_NAME2)) .build()); // Because Oracle doesn't let me create database easily I need to clean up diff --git a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleSourceTest.java b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleSourceTest.java index 4ef8b98940dc..cba09e4a98ce 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleSourceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleSourceTest.java @@ -70,7 +70,6 @@ class OracleSourceTest { private static OracleContainer ORACLE_DB; - private static OracleContainer container; private static JsonNode config; @BeforeAll @@ -87,6 +86,7 @@ void setup() throws Exception { .put("sid", ORACLE_DB.getSid()) .put("username", ORACLE_DB.getUsername()) .put("password", ORACLE_DB.getPassword()) + .put("schemas", List.of("TEST")) .build()); JdbcDatabase database = Databases.createJdbcDatabase(config.get("username").asText(), @@ -107,16 +107,6 @@ void setup() throws Exception { database.close(); } - private JdbcDatabase getDatabaseFromConfig(JsonNode config) { - return Databases.createJdbcDatabase(config.get("username").asText(), - config.get("password").asText(), - String.format("jdbc:oracle:thin:@//%s:%s/%s", - config.get("host").asText(), - config.get("port").asText(), - config.get("sid").asText()), - "oracle.jdbc.driver.OracleDriver"); - } - private JsonNode getConfig(OracleContainer oracleDb) { return Jsons.jsonNode(ImmutableMap.builder() .put("host", oracleDb.getHost()) @@ -142,7 +132,7 @@ private static void setEmittedAtToNull(Iterable messages) { @Test void testReadSuccess() throws Exception { - final Set actualMessages = MoreIterators.toSet(new OracleSource().read(getConfig(ORACLE_DB), CONFIGURED_CATALOG, null)); + final Set actualMessages = MoreIterators.toSet(new OracleSource().read(config, CONFIGURED_CATALOG, null)); setEmittedAtToNull(actualMessages); assertEquals(ASCII_MESSAGES, actualMessages);