Skip to content

Commit

Permalink
🎉 Oracle specify schema(s) to discover. (#5779)
Browse files Browse the repository at this point in the history
Closes #4944 .
  • Loading branch information
davinchia authored Sep 2, 2021
1 parent 7216de8 commit 65efaa5
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@
- sourceDefinitionId: b39a7370-74c3-45a6-ac3a-380d48520a83
name: Oracle DB
dockerRepository: airbyte/source-oracle
dockerImageTag: 0.3.2
dockerImageTag: 0.3.3
documentationUrl: https://docs.airbyte.io/integrations/sources/oracle
- sourceDefinitionId: c8630570-086d-4a40-99ae-ea5b18673071
name: Zendesk Talk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,13 @@ private static Map<String, List<String>> aggregatePrimateKeys(List<SimpleImmutab

private String getCatalog(SqlDatabase database) {
return (database.getSourceConfig().has("database") ? database.getSourceConfig().get("database").asText() : null);

}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabase database)
throws Exception {
protected List<TableInfo<CommonField<JDBCType>>> discoverInternal(JdbcDatabase database, String schema) throws Exception {
final Set<String> internalSchemas = new HashSet<>(getExcludedInternalNameSpaces());
return database.bufferedResultSetQuery(
conn -> conn.getMetaData().getColumns(getCatalog(database), null, null, null),
conn -> conn.getMetaData().getColumns(getCatalog(database), schema, null, null),
resultSet -> Jsons.jsonNode(ImmutableMap.<String, Object>builder()
// we always want a namespace, if we cannot get a schema, use db name.
.put(INTERNAL_SCHEMA_NAME,
Expand Down Expand Up @@ -165,6 +163,12 @@ public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabas
.collect(Collectors.toList());
}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabase database)
throws Exception {
return discoverInternal(database, null);
}

@Override
protected JsonSchemaPrimitive getType(JDBCType columnType) {
return SourceJdbcUtils.getType(columnType);
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-oracle/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.version=0.3.3
LABEL io.airbyte.name=airbyte/source-oracle
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.OracleJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +48,8 @@ public class OracleSource extends AbstractJdbcSource implements Source {

static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";

private List<String> schemas;

public OracleSource() {
super(DRIVER_CLASS, new OracleJdbcStreamingQueryConfiguration());
}
Expand All @@ -58,9 +67,33 @@ public JsonNode toDatabaseConfig(JsonNode config) {
configBuilder.put("password", config.get("password").asText());
}

// Use the upper-cased username by default.
schemas = List.of(config.get("username").asText().toUpperCase(Locale.ROOT));
if (config.has("schemas") && config.get("schemas").isArray()) {
schemas = new ArrayList<>();
for (final JsonNode schema : config.get("schemas")) {
schemas.add(schema.asText());
}
}

return Jsons.jsonNode(configBuilder.build());
}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(JdbcDatabase database) throws Exception {
List<TableInfo<CommonField<JDBCType>>> internals = new ArrayList<>();
for (String schema : schemas) {
LOGGER.debug("Discovering schema: {}", schema);
internals.addAll(super.discoverInternal(database, schema));
}

for (TableInfo<CommonField<JDBCType>> info : internals) {
LOGGER.debug("Found table: {}", info.getName());
}

return internals;
}

@Override
public Set<String> getExcludedInternalNameSpaces() {
// need to add SYSTEM too but for that need create another user when creating the container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true
},
"schemas": {
"title": "Schemas",
"description": "List of schemas to sync from. Defaults to user. Case sensitive.",
"type": "array",
"items": {
"type": "string"
},
"minItems": 1,
"uniqueItems": true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception
.put("sid", container.getSid())
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("schemas", List.of("JDBC_SPACE"))
.build());

JdbcDatabase database = Databases.createJdbcDatabase(config.get("username").asText(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.time.Instant;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -62,6 +63,7 @@ protected Database setupDatabase() throws Exception {
.put("sid", container.getSid())
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("schemas", List.of("TEST"))
.build());

Database database = Databases.createOracleDatabase(config.get("username").asText(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -88,6 +89,7 @@ public void setup() throws Exception {
.put("sid", ORACLE_DB.getSid())
.put("username", ORACLE_DB.getUsername())
.put("password", ORACLE_DB.getPassword())
.put("schemas", List.of(SCHEMA_NAME, SCHEMA_NAME2))
.build());

// Because Oracle doesn't let me create database easily I need to clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class OracleSourceTest {

private static OracleContainer ORACLE_DB;

private static OracleContainer container;
private static JsonNode config;

@BeforeAll
Expand All @@ -87,6 +86,7 @@ void setup() throws Exception {
.put("sid", ORACLE_DB.getSid())
.put("username", ORACLE_DB.getUsername())
.put("password", ORACLE_DB.getPassword())
.put("schemas", List.of("TEST"))
.build());

JdbcDatabase database = Databases.createJdbcDatabase(config.get("username").asText(),
Expand All @@ -107,16 +107,6 @@ void setup() throws Exception {
database.close();
}

private JdbcDatabase getDatabaseFromConfig(JsonNode config) {
return Databases.createJdbcDatabase(config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()),
"oracle.jdbc.driver.OracleDriver");
}

private JsonNode getConfig(OracleContainer oracleDb) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", oracleDb.getHost())
Expand All @@ -142,7 +132,7 @@ private static void setEmittedAtToNull(Iterable<AirbyteMessage> messages) {

@Test
void testReadSuccess() throws Exception {
final Set<AirbyteMessage> actualMessages = MoreIterators.toSet(new OracleSource().read(getConfig(ORACLE_DB), CONFIGURED_CATALOG, null));
final Set<AirbyteMessage> actualMessages = MoreIterators.toSet(new OracleSource().read(config, CONFIGURED_CATALOG, null));
setEmittedAtToNull(actualMessages);

assertEquals(ASCII_MESSAGES, actualMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ public abstract class AbstractRelationalDbSource<DataType, Database extends SqlD
protected abstract List<TableInfo<CommonField<DataType>>> discoverInternal(final Database database)
throws Exception;

/**
* Discovers all available tables within a schema in the source database.
*
* @param database - source database
* @param schema - source schema
* @return list of source tables
* @throws Exception - access to the database might lead to exceptions.
*/
protected abstract List<TableInfo<CommonField<DataType>>> discoverInternal(final Database database, String schema)
throws Exception;

/**
* Discover Primary keys for each table and @return a map of namespace.table name to their
* associated list of primary key fields.
Expand Down
7 changes: 6 additions & 1 deletion docs/integrations/sources/oracle.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ GRANT SELECT ON "<schema_b>"."<table_2>" TO airbyte;

Your database user should now be ready for use with Airbyte.

#### 3. Include the schemas Airbyte should look at when configuring the Airbyte Oracle Source.

Case sensitive. Defaults to the upper-cased user if empty. If the user does not have access to the configured schemas, no tables will be discovered.


## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.3.2 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator |
| 0.3.3 | 2021-09-01 | [5779](https://github.com/airbytehq/airbyte/pull/5779) | Ability to only discover certain schemas. |
| 0.3.2 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator. |

0 comments on commit 65efaa5

Please sign in to comment.