Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Oracle specify schema(s) to discover. #5779

Merged
merged 5 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@
- sourceDefinitionId: b39a7370-74c3-45a6-ac3a-380d48520a83
name: Oracle DB
dockerRepository: airbyte/source-oracle
dockerImageTag: 0.3.2
dockerImageTag: 0.3.3
documentationUrl: https://docs.airbyte.io/integrations/sources/oracle
- sourceDefinitionId: c8630570-086d-4a40-99ae-ea5b18673071
name: Zendesk Talk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,13 @@ private static Map<String, List<String>> aggregatePrimateKeys(List<SimpleImmutab

private String getCatalog(SqlDatabase database) {
return (database.getSourceConfig().has("database") ? database.getSourceConfig().get("database").asText() : null);

}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabase database)
throws Exception {
protected List<TableInfo<CommonField<JDBCType>>> discoverInternal(JdbcDatabase database, String schema) throws Exception {
final Set<String> internalSchemas = new HashSet<>(getExcludedInternalNameSpaces());
return database.bufferedResultSetQuery(
conn -> conn.getMetaData().getColumns(getCatalog(database), null, null, null),
conn -> conn.getMetaData().getColumns(getCatalog(database), schema, null, null),
resultSet -> Jsons.jsonNode(ImmutableMap.<String, Object>builder()
// we always want a namespace, if we cannot get a schema, use db name.
.put(INTERNAL_SCHEMA_NAME,
Expand Down Expand Up @@ -165,6 +163,12 @@ public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabas
.collect(Collectors.toList());
}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabase database)
throws Exception {
return discoverInternal(database, null);
}

@Override
protected JsonSchemaPrimitive getType(JDBCType columnType) {
return SourceJdbcUtils.getType(columnType);
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-oracle/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.version=0.3.3
LABEL io.airbyte.name=airbyte/source-oracle
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.OracleJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +48,8 @@ public class OracleSource extends AbstractJdbcSource implements Source {

static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";

private List<String> schemas;

public OracleSource() {
super(DRIVER_CLASS, new OracleJdbcStreamingQueryConfiguration());
}
Expand All @@ -58,9 +67,33 @@ public JsonNode toDatabaseConfig(JsonNode config) {
configBuilder.put("password", config.get("password").asText());
}

// Use the upper-cased username by default.
schemas = List.of(config.get("username").asText().toUpperCase(Locale.ROOT));
if (config.has("schemas") && config.get("schemas").isArray()) {
schemas = new ArrayList<>();
for (final JsonNode schema : config.get("schemas")) {
schemas.add(schema.asText());
}
}

return Jsons.jsonNode(configBuilder.build());
}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(JdbcDatabase database) throws Exception {
List<TableInfo<CommonField<JDBCType>>> internals = new ArrayList<>();
for (String schema : schemas) {
LOGGER.debug("Discovering schema: {}", schema);
internals.addAll(super.discoverInternal(database, schema));
}

for (TableInfo<CommonField<JDBCType>> info : internals) {
LOGGER.debug("Found table: {}", info.getName());
}

return internals;
}

@Override
public Set<String> getExcludedInternalNameSpaces() {
// need to add SYSTEM too but for that need create another user when creating the container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true
},
"schemas": {
"title": "Schemas",
"description": "List of schemas to sync from. Defaults to user. Case sensitive.",
"type": "array",
"items": {
"type": "string"
},
"minItems": 1,
"uniqueItems": true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception
.put("sid", container.getSid())
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("schemas", List.of("JDBC_SPACE"))
.build());

JdbcDatabase database = Databases.createJdbcDatabase(config.get("username").asText(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.time.Instant;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -62,6 +63,7 @@ protected Database setupDatabase() throws Exception {
.put("sid", container.getSid())
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("schemas", List.of("TEST"))
.build());

Database database = Databases.createOracleDatabase(config.get("username").asText(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -88,6 +89,7 @@ public void setup() throws Exception {
.put("sid", ORACLE_DB.getSid())
.put("username", ORACLE_DB.getUsername())
.put("password", ORACLE_DB.getPassword())
.put("schemas", List.of(SCHEMA_NAME, SCHEMA_NAME2))
.build());

// Because Oracle doesn't let me create database easily I need to clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class OracleSourceTest {

private static OracleContainer ORACLE_DB;

private static OracleContainer container;
private static JsonNode config;

@BeforeAll
Expand All @@ -87,6 +86,7 @@ void setup() throws Exception {
.put("sid", ORACLE_DB.getSid())
.put("username", ORACLE_DB.getUsername())
.put("password", ORACLE_DB.getPassword())
.put("schemas", List.of("TEST"))
.build());

JdbcDatabase database = Databases.createJdbcDatabase(config.get("username").asText(),
Expand All @@ -107,16 +107,6 @@ void setup() throws Exception {
database.close();
}

private JdbcDatabase getDatabaseFromConfig(JsonNode config) {
return Databases.createJdbcDatabase(config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()),
"oracle.jdbc.driver.OracleDriver");
}

private JsonNode getConfig(OracleContainer oracleDb) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", oracleDb.getHost())
Expand All @@ -142,7 +132,7 @@ private static void setEmittedAtToNull(Iterable<AirbyteMessage> messages) {

@Test
void testReadSuccess() throws Exception {
final Set<AirbyteMessage> actualMessages = MoreIterators.toSet(new OracleSource().read(getConfig(ORACLE_DB), CONFIGURED_CATALOG, null));
final Set<AirbyteMessage> actualMessages = MoreIterators.toSet(new OracleSource().read(config, CONFIGURED_CATALOG, null));
setEmittedAtToNull(actualMessages);

assertEquals(ASCII_MESSAGES, actualMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ public abstract class AbstractRelationalDbSource<DataType, Database extends SqlD
protected abstract List<TableInfo<CommonField<DataType>>> discoverInternal(final Database database)
throws Exception;

/**
* Discovers all available tables within a schema in the source database.
*
* @param database - source database
* @param schema - source schema
* @return list of source tables
* @throws Exception - access to the database might lead to exceptions.
*/
protected abstract List<TableInfo<CommonField<DataType>>> discoverInternal(final Database database, String schema)
throws Exception;

/**
* Discover Primary keys for each table and @return a map of namespace.table name to their
* associated list of primary key fields.
Expand Down
7 changes: 6 additions & 1 deletion docs/integrations/sources/oracle.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ GRANT SELECT ON "<schema_b>"."<table_2>" TO airbyte;

Your database user should now be ready for use with Airbyte.

#### 3. Include the schemas Airbyte should look at when configuring the Airbyte Oracle Source.

Case sensitive. Defaults to the upper-cased user if empty. If the user does not have access to the configured schemas, no tables will be discovered.


## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.3.2 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator |
| 0.3.3 | 2021-09-01 | [5779](https://github.com/airbytehq/airbyte/pull/5779) | Ability to only discover certain schemas. |
| 0.3.2 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator. |