From 942ef2ec0c4f86a7b0a40520f6c4914b864c02ae Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Wed, 25 Aug 2021 12:02:24 +0100 Subject: [PATCH 1/6] skeleton databricks destination --- .../9efb1f7c-c3ef-414b-8776-fa12c5df70d9.json | 7 + .../destination-databricks/.dockerignore | 3 + .../destination-databricks/Dockerfile | 11 ++ .../destination-databricks/README.md | 68 +++++++ .../destination-databricks/build.gradle | 21 +++ .../databricks/DatabricksDestination.java | 117 ++++++++++++ .../databricks/DatabricksNameTransformer.java | 86 +++++++++ .../databricks/DatabricksSqlOperations.java | 178 ++++++++++++++++++ .../databricks/DynamicClassLoader.java | 76 ++++++++ .../src/main/resources/spec.json | 31 +++ .../DatabricksDestinationAcceptanceTest.java | 82 ++++++++ docs/integrations/destinations/databricks.md | 52 +++++ 12 files changed, 732 insertions(+) create mode 100644 airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9efb1f7c-c3ef-414b-8776-fa12c5df70d9.json create mode 100644 airbyte-integrations/connectors/destination-databricks/.dockerignore create mode 100644 airbyte-integrations/connectors/destination-databricks/Dockerfile create mode 100644 airbyte-integrations/connectors/destination-databricks/README.md create mode 100644 airbyte-integrations/connectors/destination-databricks/build.gradle create mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java create mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java create mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java create mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DynamicClassLoader.java create mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json create mode 100644 airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java create mode 100644 docs/integrations/destinations/databricks.md diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9efb1f7c-c3ef-414b-8776-fa12c5df70d9.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9efb1f7c-c3ef-414b-8776-fa12c5df70d9.json new file mode 100644 index 0000000000000..0754399bd19b9 --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9efb1f7c-c3ef-414b-8776-fa12c5df70d9.json @@ -0,0 +1,7 @@ +{ + "destinationDefinitionId": "9efb1f7c-c3ef-414b-8776-fa12c5df70d9", + "name": "Databricks", + "dockerRepository": "airbyte/destination-databricks", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/databricks" +} diff --git a/airbyte-integrations/connectors/destination-databricks/.dockerignore b/airbyte-integrations/connectors/destination-databricks/.dockerignore new file mode 100644 index 0000000000000..65c7d0ad3e73c --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-databricks/Dockerfile b/airbyte-integrations/connectors/destination-databricks/Dockerfile new file mode 100644 index 0000000000000..4c3d7fc644ca0 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-databricks + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-databricks diff --git a/airbyte-integrations/connectors/destination-databricks/README.md b/airbyte-integrations/connectors/destination-databricks/README.md new file mode 100644 index 0000000000000..70c1750713579 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/README.md @@ -0,0 +1,68 @@ +# Destination Databricks + +This is the repository for the Databricks destination connector in Java. +For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/databricks). + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:build +``` + +#### Create credentials +**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. +Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. + +**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-databricks:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databricks:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databricks:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-databricks:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/io/airbyte/integrations/destinations/databricks`. + +#### Acceptance Tests +Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in +`src/test-integration/java/io/airbyte/integrations/destinations/databricksDestinationAcceptanceTest.java`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-databricks:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-databricks/build.gradle b/airbyte-integrations/connectors/destination-databricks/build.gradle new file mode 100644 index 0000000000000..ce605fc2bd71a --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/build.gradle @@ -0,0 +1,21 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.databricks.DatabricksDestination' +} + +dependencies { + implementation project(':airbyte-db') + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + implementation project(':airbyte-integrations:connectors:destination-jdbc') + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-databricks') +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java new file mode 100644 index 0000000000000..7c7d8c65876cd --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java @@ -0,0 +1,117 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatabricksDestination extends AbstractJdbcDestination implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestination.class); + + public static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; + + // TODO: this isn't working yet! + public static void getDriver() throws MalformedURLException, ClassNotFoundException { + File driverJar = new File("/Users/phlair/Downloads/SparkDriver/SparkJDBC42.jar"); + URL jarUrl = new URL("jar", "", "file:" + driverJar.getAbsolutePath() + "!/"); + URLClassLoader myLoader = new URLClassLoader(new URL[] { jarUrl } ); + myLoader.loadClass(DRIVER_CLASS); + } + + @Override + public AirbyteConnectionStatus check(JsonNode config) { + + try (final JdbcDatabase database = getDatabase(config)) { + DatabricksSqlOperations databricksSqlOperations = (DatabricksSqlOperations) getSqlOperations(); + + String outputSchema = getNamingResolver().getIdentifier(config.get("database").asText()); + attemptSQLCreateAndDropTableOperations(outputSchema, database, getNamingResolver(), databricksSqlOperations); + + databricksSqlOperations.verifyLocalFileEnabled(database); + + // TODO: enforce databricks runtime version instead of this mySql code +// VersionCompatibility compatibility = dbSqlOperations.isCompatibleVersion(database); +// if (!compatibility.isCompatible()) { +// throw new RuntimeException(String +// .format("Your MySQL version %s is not compatible with Airbyte", +// compatibility.getVersion())); +// } + + return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } catch (Exception e) { + LOGGER.error("Exception while checking connection: ", e); + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage("Could not connect with provided configuration. \n" + e.getMessage()); + } + } + + public DatabricksDestination() { + super(DRIVER_CLASS, new DatabricksNameTransformer(), new DatabricksSqlOperations()); + } + + @Override + public JsonNode toJdbcConfig(JsonNode databricksConfig) { + return getJdbcConfig(databricksConfig); + } + + public static JsonNode getJdbcConfig(JsonNode databricksConfig) { + final String schema = Optional.ofNullable(databricksConfig.get("schema")).map(JsonNode::asText).orElse("default"); + + return Jsons.jsonNode(ImmutableMap.builder() + .put("username", "dummy") + .put("password", "dummy") +// .put("jdbc_url", String.format("jdbc:TODO://%s:%s/%s", +// databricksConfig.get("host").asText(), +// databricksConfig.get("port").asText(), +// databricksConfig.get("database").asText())) +// .put("schema", schema) + .put("jdbc_url", databricksConfig.get("jdbcUrl").asText()) + .build()); + } + + public static void main(String[] args) throws Exception { + LOGGER.info("starting destination: {}", DatabricksDestination.class); + getDriver(); + new IntegrationRunner(new DatabricksDestination()).run(args); + LOGGER.info("completed destination: {}", DatabricksDestination.class); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java new file mode 100644 index 0000000000000..141b2b4e29c9b --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java @@ -0,0 +1,86 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import io.airbyte.integrations.destination.ExtendedNameTransformer; + +/** + * TODO: Replace below MySQL docstring with Databricks equiv. + * + * Note that MySQL documentation discusses about identifiers case sensitivity using the + * lower_case_table_names system variable. As one of their recommendation is: "It is best to adopt a + * consistent convention, such as always creating and referring to databases and tables using + * lowercase names. This convention is recommended for maximum portability and ease of use. + * + * Source: https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html" + * + * As a result, we are here forcing all identifier (table, schema and columns) names to lowercase. + */ +public class DatabricksNameTransformer extends ExtendedNameTransformer { + + // These constants must match those in destination_name_transformer.py + public static final int MAX_MYSQL_NAME_LENGTH = 64; + // DBT appends a suffix to table names + public static final int TRUNCATE_DBT_RESERVED_SIZE = 12; + // 4 charachters for 1 underscore and 3 suffix (e.g. _ab1) + // 4 charachters for 1 underscore and 3 schema hash + public static final int TRUNCATE_RESERVED_SIZE = 8; + public static final int TRUNCATION_MAX_NAME_LENGTH = MAX_MYSQL_NAME_LENGTH - TRUNCATE_DBT_RESERVED_SIZE - TRUNCATE_RESERVED_SIZE; + + @Override + public String getIdentifier(String name) { + String identifier = applyDefaultCase(super.getIdentifier(name)); + return truncateName(identifier, TRUNCATION_MAX_NAME_LENGTH); + } + + @Override + public String getTmpTableName(String streamName) { + String tmpTableName = applyDefaultCase(super.getTmpTableName(streamName)); + return truncateName(tmpTableName, TRUNCATION_MAX_NAME_LENGTH); + } + + @Override + public String getRawTableName(String streamName) { + String rawTableName = applyDefaultCase(super.getRawTableName(streamName)); + return truncateName(rawTableName, TRUNCATION_MAX_NAME_LENGTH); + } + + static String truncateName(String name, int maxLength) { + if (name.length() <= maxLength) { + return name; + } + + int allowedLength = maxLength - 2; + String prefix = name.substring(0, allowedLength / 2); + String suffix = name.substring(name.length() - allowedLength / 2); + return prefix + "__" + suffix; + } + + @Override + protected String applyDefaultCase(String input) { + return input.toLowerCase(); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java new file mode 100644 index 0000000000000..8da5c572e84f6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java @@ -0,0 +1,178 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.destination.jdbc.DefaultSqlOperations; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.stream.Collectors; + +public class DatabricksSqlOperations extends DefaultSqlOperations { + + private boolean isLocalFileEnabled = false; + + @Override + public void executeTransaction(JdbcDatabase database, List queries) throws Exception { + database.executeWithinTransaction(queries); + } + + @Override + public void insertRecords(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName) + throws SQLException { + if (records.isEmpty()) { + return; + } + + verifyLocalFileEnabled(database); + try { + File tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile(); + + loadDataIntoTable(database, records, schemaName, tmpTableName, tmpFile); + + Files.delete(tmpFile.toPath()); + } catch (IOException e) { + throw new SQLException(e); + } + } + + private void loadDataIntoTable(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName, + File tmpFile) + throws SQLException { + database.execute(connection -> { + try { + writeBatchToFile(tmpFile, records); + + String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'"; + + String query = String.format( + "LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\"' LINES TERMINATED BY '\\r\\n'", + absoluteFile, schemaName, tmpTableName); + + try (Statement stmt = connection.createStatement()) { + stmt.execute(query); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + protected JsonNode formatData(JsonNode data) { + return StandardNameTransformer.formatJsonPath(data); + } + + void verifyLocalFileEnabled(JdbcDatabase database) throws SQLException { + boolean localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database); + if (!localFileEnabled) { + tryEnableLocalFile(database); + } + isLocalFileEnabled = true; + } + + private void tryEnableLocalFile(JdbcDatabase database) throws SQLException { + database.execute(connection -> { + try (Statement statement = connection.createStatement()) { + statement.execute("set global local_infile=true"); + } catch (Exception e) { + throw new RuntimeException( + "The DB user provided to airbyte was unable to switch on the local_infile attribute on the MySQL server. As an admin user, you will need to run \"SET GLOBAL local_infile = true\" before syncing data with Airbyte.", + e); + } + }); + } + + private double getVersion(JdbcDatabase database) throws SQLException { + List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), + resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); + return Double.parseDouble(value.get(0).substring(0, 3)); + } + + VersionCompatibility isCompatibleVersion(JdbcDatabase database) throws SQLException { + double version = getVersion(database); + return new VersionCompatibility(version, version >= 5.7); + } + + @Override + public boolean isSchemaRequired() { + return false; + } + + private boolean checkIfLocalFileIsEnabled(JdbcDatabase database) throws SQLException { + List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), + resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); + + return value.get(0).equalsIgnoreCase("on"); + } + + @Override + public String createTableQuery(JdbcDatabase database, String schemaName, String tableName) { + // MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column, + // 256 is enough + return String.format( + "CREATE TABLE IF NOT EXISTS %s.%s ( \n" + + "%s VARCHAR(256) PRIMARY KEY,\n" + + "%s JSON,\n" + + "%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6)\n" + + ");\n", + schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + } + + public static class VersionCompatibility { + + private final double version; + private final boolean isCompatible; + + public VersionCompatibility(double version, boolean isCompatible) { + this.version = version; + this.isCompatible = isCompatible; + } + + public double getVersion() { + return version; + } + + public boolean isCompatible() { + return isCompatible; + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DynamicClassLoader.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DynamicClassLoader.java new file mode 100644 index 0000000000000..ca38d5ec1ad68 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DynamicClassLoader.java @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Paths; + +public final class DynamicClassLoader extends URLClassLoader { + + static { + registerAsParallelCapable(); + } + + public DynamicClassLoader(String name, ClassLoader parent) { + super(name, new URL[0], parent); + } + + /* + * Required when this classloader is used as the system classloader + */ + public DynamicClassLoader(ClassLoader parent) { + this("classpath", parent); + } + + public DynamicClassLoader() { + this(Thread.currentThread().getContextClassLoader()); + } + + void add(URL url) { + addURL(url); + } + + public static DynamicClassLoader findAncestor(ClassLoader cl) { + do { + + if (cl instanceof DynamicClassLoader) + return (DynamicClassLoader) cl; + + cl = cl.getParent(); + } while (cl != null); + + return null; + } + + /* + * Required for Java Agents when this classloader is used as the system classloader + */ + @SuppressWarnings("unused") + private void appendToClassPathForInstrumentation(String jarfile) throws IOException { + add(Paths.get(jarfile).toRealPath().toUri().toURL()); + } +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json new file mode 100644 index 0000000000000..f5264cb4312c5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json @@ -0,0 +1,31 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/databricks", + "supportsIncremental": false, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Databricks Destination Spec", + "type": "object", + "required": [ + "jdbcUrl" + ], + "additionalProperties": false, + "properties": { + "jdbcUrl": { + "title": "JDBC URL", + "type": "string", + "description": "", + "examples": [""], + "airbyte_secret": true + }, + "database": { + "title": "Database", + "type": "string", + "description": "", + "examples": [""] + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..84aec1d1c0a5b --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java @@ -0,0 +1,82 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.io.IOException; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatabricksDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestinationAcceptanceTest.class); + + private JsonNode configJson; + + @Override + protected String getImageName() { + return "airbyte/destination-databricks:dev"; + } + + @Override + protected JsonNode getConfig() { + // TODO: Generate the configuration JSON file to be used for running the destination during the test + // configJson can either be static and read from secrets/config.json directly + // or created in the setup method + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + // TODO return an invalid config which, when used to run the connector's check connection operation, + // should result in a failed connection check + return null; + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + // TODO Implement this method to retrieve records which written to the destination by the connector. + // Records returned from this method will be compared against records provided to the connector + // to verify they were written correctly + return null; + } + + @Override + protected void setup(TestDestinationEnv testEnv) { + // TODO Implement this method to run any setup actions needed before every test case + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + // TODO Implement this method to run any cleanup actions needed after every test case + } + +} diff --git a/docs/integrations/destinations/databricks.md b/docs/integrations/destinations/databricks.md new file mode 100644 index 0000000000000..d2570854d2884 --- /dev/null +++ b/docs/integrations/destinations/databricks.md @@ -0,0 +1,52 @@ +# Databricks + +TODO: update this doc + +## Sync overview + +### Output schema + +Is the output schema fixed (e.g: for an API like Stripe)? If so, point to the connector's schema (e.g: link to Stripe’s documentation) or describe the schema here directly (e.g: include a diagram or paragraphs describing the schema). + +Describe how the connector's schema is mapped to Airbyte concepts. An example description might be: "MagicDB tables become Airbyte Streams and MagicDB columns become Airbyte Fields. In addition, an extracted\_at column is appended to each row being read." + +### Data type mapping + +This section should contain a table mapping each of the connector's data types to Airbyte types. At the moment, Airbyte uses the same types used by [JSONSchema](https://json-schema.org/understanding-json-schema/reference/index.html). `string`, `date-time`, `object`, `array`, `boolean`, `integer`, and `number` are the most commonly used data types. + +| Integration Type | Airbyte Type | Notes | +| :--- | :--- | :--- | + + +### Features + +This section should contain a table with the following format: + +| Feature | Supported?(Yes/No) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | | | +| Incremental Sync | | | +| Replicate Incremental Deletes | | | +| For databases, WAL/Logical replication | | | +| SSL connection | | | +| SSH Tunnel Support | | | +| (Any other source-specific features) | | | + +### Performance considerations + +Could this connector hurt the user's database/API/etc... or put too much strain on it in certain circumstances? For example, if there are a lot of tables or rows in a table? What is the breaking point (e.g: 100mm> records)? What can the user do to prevent this? (e.g: use a read-only replica, or schedule frequent syncs, etc..) + +## Getting started + +### Requirements + +* What versions of this connector does this implementation support? (e.g: `postgres v3.14 and above`) +* What configurations, if any, are required on the connector? (e.g: `buffer_size > 1024`) +* Network accessibility requirements +* Credentials/authentication requirements? (e.g: A DB user with read permissions on certain tables) + +### Setup guide + +For each of the above high-level requirements as appropriate, add or point to a follow-along guide. See existing source or destination guides for an example. + +For each major cloud provider we support, also add a follow-along guide for setting up Airbyte to connect to that destination. See the Postgres destination guide for an example of what this should look like. From 4c3e10c0a737866828a4d5f6fdd3e8b751ed9f94 Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Wed, 25 Aug 2021 18:38:43 +0100 Subject: [PATCH 2/6] Delete DynamicClassLoader.java --- .../databricks/DynamicClassLoader.java | 76 ------------------- 1 file changed, 76 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DynamicClassLoader.java diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DynamicClassLoader.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DynamicClassLoader.java deleted file mode 100644 index ca38d5ec1ad68..0000000000000 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DynamicClassLoader.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.destination.databricks; - -import java.io.IOException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.file.Paths; - -public final class DynamicClassLoader extends URLClassLoader { - - static { - registerAsParallelCapable(); - } - - public DynamicClassLoader(String name, ClassLoader parent) { - super(name, new URL[0], parent); - } - - /* - * Required when this classloader is used as the system classloader - */ - public DynamicClassLoader(ClassLoader parent) { - this("classpath", parent); - } - - public DynamicClassLoader() { - this(Thread.currentThread().getContextClassLoader()); - } - - void add(URL url) { - addURL(url); - } - - public static DynamicClassLoader findAncestor(ClassLoader cl) { - do { - - if (cl instanceof DynamicClassLoader) - return (DynamicClassLoader) cl; - - cl = cl.getParent(); - } while (cl != null); - - return null; - } - - /* - * Required for Java Agents when this classloader is used as the system classloader - */ - @SuppressWarnings("unused") - private void appendToClassPathForInstrumentation(String jarfile) throws IOException { - add(Paths.get(jarfile).toRealPath().toUri().toURL()); - } -} From af94c07a22ae742f3a5d67e2f1e11a475f2fa227 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Mon, 30 Aug 2021 20:25:55 -0700 Subject: [PATCH 3/6] Update dependency --- .../connectors/destination-databricks/build.gradle | 2 +- .../databricks/DatabricksSqlOperations.java | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/destination-databricks/build.gradle b/airbyte-integrations/connectors/destination-databricks/build.gradle index ce605fc2bd71a..db6fd81afc2e9 100644 --- a/airbyte-integrations/connectors/destination-databricks/build.gradle +++ b/airbyte-integrations/connectors/destination-databricks/build.gradle @@ -9,7 +9,7 @@ application { } dependencies { - implementation project(':airbyte-db') + implementation project(':airbyte-db:lib') implementation project(':airbyte-config:models') implementation project(':airbyte-protocol:models') implementation project(':airbyte-integrations:bases:base-java') diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java index 8da5c572e84f6..31ba481c8f670 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java @@ -28,7 +28,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.StandardNameTransformer; -import io.airbyte.integrations.destination.jdbc.DefaultSqlOperations; +import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.File; import java.io.IOException; @@ -38,7 +38,7 @@ import java.util.List; import java.util.stream.Collectors; -public class DatabricksSqlOperations extends DefaultSqlOperations { +public class DatabricksSqlOperations extends JdbcSqlOperations { private boolean isLocalFileEnabled = false; @@ -48,10 +48,10 @@ public void executeTransaction(JdbcDatabase database, List queries) thro } @Override - public void insertRecords(JdbcDatabase database, - List records, - String schemaName, - String tmpTableName) + public void insertRecordsInternal(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName) throws SQLException { if (records.isEmpty()) { return; From d7db844e93a945585f771099e7f0d89b4da2396a Mon Sep 17 00:00:00 2001 From: LiRen Tu Date: Tue, 31 Aug 2021 05:39:39 -0700 Subject: [PATCH 4/6] Implement databricks destination as a stream copier (#5748) --- .../destination-databricks/build.gradle | 8 ++ .../databricks/DatabricksDestination.java | 113 ++++++----------- .../databricks/DatabricksStreamCopier.java | 114 ++++++++++++++++++ .../DatabricksStreamCopierFactory.java | 43 +++++++ .../jdbc/copy/CopyConsumerFactory.java | 9 +- .../destination/jdbc/copy/StreamCopier.java | 3 +- .../jdbc/copy/StreamCopierFactory.java | 12 +- .../jdbc/copy/gcs/GcsStreamCopier.java | 9 +- .../jdbc/copy/gcs/GcsStreamCopierFactory.java | 20 +-- .../jdbc/copy/s3/S3StreamCopier.java | 11 +- .../jdbc/copy/s3/S3StreamCopierFactory.java | 22 ++-- 11 files changed, 247 insertions(+), 117 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java create mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java diff --git a/airbyte-integrations/connectors/destination-databricks/build.gradle b/airbyte-integrations/connectors/destination-databricks/build.gradle index db6fd81afc2e9..f260d1903cc87 100644 --- a/airbyte-integrations/connectors/destination-databricks/build.gradle +++ b/airbyte-integrations/connectors/destination-databricks/build.gradle @@ -15,6 +15,14 @@ dependencies { implementation project(':airbyte-integrations:bases:base-java') implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) implementation project(':airbyte-integrations:connectors:destination-jdbc') + implementation project(':airbyte-integrations:connectors:destination-s3') + + // parquet + implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0' + implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0' + implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0' + implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0' + implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10' integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-databricks') diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java index 7c7d8c65876cd..576627a3dc625 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java @@ -25,93 +25,60 @@ package io.airbyte.integrations.destination.databricks; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.Destination; -import io.airbyte.integrations.base.IntegrationRunner; -import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; -import io.airbyte.protocol.models.AirbyteConnectionStatus; -import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; -import java.io.File; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; +import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; -public class DatabricksDestination extends AbstractJdbcDestination implements Destination { +public class DatabricksDestination extends CopyDestination { - private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestination.class); - - public static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; - - // TODO: this isn't working yet! - public static void getDriver() throws MalformedURLException, ClassNotFoundException { - File driverJar = new File("/Users/phlair/Downloads/SparkDriver/SparkJDBC42.jar"); - URL jarUrl = new URL("jar", "", "file:" + driverJar.getAbsolutePath() + "!/"); - URLClassLoader myLoader = new URLClassLoader(new URL[] { jarUrl } ); - myLoader.loadClass(DRIVER_CLASS); - } + private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; @Override - public AirbyteConnectionStatus check(JsonNode config) { - - try (final JdbcDatabase database = getDatabase(config)) { - DatabricksSqlOperations databricksSqlOperations = (DatabricksSqlOperations) getSqlOperations(); - - String outputSchema = getNamingResolver().getIdentifier(config.get("database").asText()); - attemptSQLCreateAndDropTableOperations(outputSchema, database, getNamingResolver(), databricksSqlOperations); - - databricksSqlOperations.verifyLocalFileEnabled(database); - - // TODO: enforce databricks runtime version instead of this mySql code -// VersionCompatibility compatibility = dbSqlOperations.isCompatibleVersion(database); -// if (!compatibility.isCompatible()) { -// throw new RuntimeException(String -// .format("Your MySQL version %s is not compatible with Airbyte", -// compatibility.getVersion())); -// } - - return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - } catch (Exception e) { - LOGGER.error("Exception while checking connection: ", e); - return new AirbyteConnectionStatus() - .withStatus(Status.FAILED) - .withMessage("Could not connect with provided configuration. \n" + e.getMessage()); - } + public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer outputRecordCollector) { + return CopyConsumerFactory.create( + outputRecordCollector, + getDatabase(config), + getSqlOperations(), + getNameTransformer(), + S3Config.getS3Config(config), + catalog, + new DatabricksStreamCopierFactory(), + config.get("schema").asText() + ); } - public DatabricksDestination() { - super(DRIVER_CLASS, new DatabricksNameTransformer(), new DatabricksSqlOperations()); + @Override + public void checkPersistence(JsonNode config) { + S3StreamCopier.attemptS3WriteAndDelete(S3Config.getS3Config(config)); } @Override - public JsonNode toJdbcConfig(JsonNode databricksConfig) { - return getJdbcConfig(databricksConfig); + public ExtendedNameTransformer getNameTransformer() { + return new DatabricksNameTransformer(); } - public static JsonNode getJdbcConfig(JsonNode databricksConfig) { - final String schema = Optional.ofNullable(databricksConfig.get("schema")).map(JsonNode::asText).orElse("default"); - - return Jsons.jsonNode(ImmutableMap.builder() - .put("username", "dummy") - .put("password", "dummy") -// .put("jdbc_url", String.format("jdbc:TODO://%s:%s/%s", -// databricksConfig.get("host").asText(), -// databricksConfig.get("port").asText(), -// databricksConfig.get("database").asText())) -// .put("schema", schema) - .put("jdbc_url", databricksConfig.get("jdbcUrl").asText()) - .build()); + @Override + public JdbcDatabase getDatabase(JsonNode databricksConfig) { + return Databases.createJdbcDatabase( + databricksConfig.get("username").asText(), + databricksConfig.has("password") ? databricksConfig.get("password").asText() : null, + databricksConfig.get("jdbc_url").asText(), + DRIVER_CLASS + ); } - public static void main(String[] args) throws Exception { - LOGGER.info("starting destination: {}", DatabricksDestination.class); - getDriver(); - new IntegrationRunner(new DatabricksDestination()).run(args); - LOGGER.info("completed destination: {}", DatabricksDestination.class); + @Override + public SqlOperations getSqlOperations() { + return new DatabricksSqlOperations(); } } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java new file mode 100644 index 0000000000000..0c706d0686593 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -0,0 +1,114 @@ +package io.airbyte.integrations.destination.databricks; + +import com.amazonaws.services.s3.AmazonS3; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig; +import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; +import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.sql.Timestamp; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation is similar to {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. + * The difference is that this implementation creates Parquet staging files, instead of CSV ones. + */ +public class DatabricksStreamCopier implements StreamCopier { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStreamCopier.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final AmazonS3 s3Client; + private final S3Config s3Config; + private final String tmpTableName; + private final AirbyteStream stream; + private final JdbcDatabase db; + private final ExtendedNameTransformer nameTransformer; + private final SqlOperations sqlOperations; + private final S3ParquetWriter parquetWriter; + + public DatabricksStreamCopier(String stagingFolder, + String schema, + ConfiguredAirbyteStream configuredStream, + AmazonS3 s3Client, + JdbcDatabase db, + S3Config s3Config, + ExtendedNameTransformer nameTransformer, + SqlOperations sqlOperations, + S3WriterFactory writerFactory, + Timestamp uploadTime) throws Exception { + this.stream = configuredStream.getStream(); + this.db = db; + this.nameTransformer = nameTransformer; + this.sqlOperations = sqlOperations; + this.tmpTableName = nameTransformer.getTmpTableName(stream.getName()); + this.s3Client = s3Client; + this.s3Config = s3Config; + this.parquetWriter = (S3ParquetWriter) writerFactory + .create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime); + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { + parquetWriter.write(id, recordMessage); + } + + @Override + public void closeStagingUploader(boolean hasFailed) throws Exception { + parquetWriter.close(hasFailed); + } + + @Override + public void createTemporaryTable() throws Exception { + + } + + @Override + public void copyStagingFileToTemporaryTable() throws Exception { + + } + + @Override + public void createDestinationSchema() throws Exception { + + } + + @Override + public String createDestinationTable() throws Exception { + return null; + } + + @Override + public String generateMergeStatement(String destTableName) throws Exception { + return null; + } + + @Override + public void removeFileAndDropTmpTable() throws Exception { + + } + + private S3DestinationConfig getS3DestinationConfig(S3Config s3Config, String stagingFolder) { + return new S3DestinationConfig( + s3Config.getEndpoint(), + s3Config.getBucketName(), + stagingFolder, + s3Config.getRegion(), + s3Config.getAccessKeyId(), + s3Config.getSecretAccessKey(), + // use default parquet format config + new S3ParquetFormatConfig(MAPPER.createObjectNode()) + ); + } + +} diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java new file mode 100644 index 0000000000000..3f73f5f0e7efe --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java @@ -0,0 +1,43 @@ +package io.airbyte.integrations.destination.databricks; + +import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; +import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; +import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory; +import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.sql.Timestamp; + +public class DatabricksStreamCopierFactory implements StreamCopierFactory { + + @Override + public StreamCopier create(String configuredSchema, + S3Config s3Config, + String stagingFolder, + ConfiguredAirbyteStream configuredStream, + ExtendedNameTransformer nameTransformer, + JdbcDatabase db, + SqlOperations sqlOperations) { + try { + AirbyteStream stream = configuredStream.getStream(); + String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer); + AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config); + S3WriterFactory writerFactory = new ProductionWriterFactory(); + Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); + + return new DatabricksStreamCopier( + stagingFolder, schema, configuredStream, s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java index 9dcb075575bb7..a11d341b9144d 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java @@ -24,7 +24,6 @@ package io.airbyte.integrations.destination.jdbc.copy; -import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; @@ -37,8 +36,6 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import java.sql.Timestamp; -import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -94,8 +91,7 @@ private static Map createWrite for (var configuredStream : catalog.getStreams()) { var stream = configuredStream.getStream(); var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); - var syncMode = configuredStream.getDestinationSyncMode(); - var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, syncMode, stream, namingResolver, database, sqlOperations); + var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, configuredStream, namingResolver, database, sqlOperations); pairToCopier.put(pair, copier); } @@ -116,8 +112,7 @@ private static RecordWriter recordWriterFunction(Map { @@ -35,10 +36,17 @@ public interface StreamCopierFactory { StreamCopier create(String configuredSchema, T config, String stagingFolder, - DestinationSyncMode syncMode, - AirbyteStream stream, + ConfiguredAirbyteStream configuredStream, ExtendedNameTransformer nameTransformer, JdbcDatabase db, SqlOperations sqlOperations); + static String getSchema(AirbyteStream stream, String configuredSchema, ExtendedNameTransformer nameTransformer) { + if (stream.getNamespace() != null) { + return nameTransformer.convertStreamName(stream.getNamespace()); + } else { + return nameTransformer.convertStreamName(configuredSchema); + } + } + } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java index c74ee13e83890..69fe6dddff781 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -30,10 +30,12 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; +import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.DestinationSyncMode; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -44,6 +46,7 @@ import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.sql.Timestamp; +import java.time.Instant; import java.util.UUID; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -103,8 +106,10 @@ public GcsStreamCopier(String stagingFolder, } @Override - public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception { - csvPrinter.printRecord(id, jsonDataString, emittedAt); + public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { + csvPrinter.printRecord(id, + Jsons.serialize(recordMessage.getData()), + Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); } @Override diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java index 594eaf85285f1..eb2d44af8235e 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java @@ -28,12 +28,12 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -47,14 +47,14 @@ public abstract class GcsStreamCopierFactory implements StreamCopierFactory { @@ -43,17 +43,17 @@ public abstract class S3StreamCopierFactory implements StreamCopierFactory Date: Thu, 2 Sep 2021 22:22:30 +0100 Subject: [PATCH 5/6] Update spec and configs (#5792) * changes toward creating database/tables through jdbc * Delete DatabricksSqlOperations.java * revert sqlops * minor changes --- .../databricks/DatabricksDestination.java | 15 +++- .../databricks/DatabricksStreamCopier.java | 52 +++++++++--- .../DatabricksStreamCopierFactory.java | 4 +- .../src/main/resources/spec.json | 83 +++++++++++++++++-- .../s3/parquet/S3ParquetWriter.java | 2 + 5 files changed, 133 insertions(+), 23 deletions(-) diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java index 576627a3dc625..f90fc23532717 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java @@ -28,6 +28,7 @@ import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; @@ -42,6 +43,10 @@ public class DatabricksDestination extends CopyDestination { private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; + public static void main(String[] args) throws Exception { + new IntegrationRunner(new DatabricksDestination()).run(args); + } + @Override public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer outputRecordCollector) { return CopyConsumerFactory.create( @@ -52,7 +57,7 @@ public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCata S3Config.getS3Config(config), catalog, new DatabricksStreamCopierFactory(), - config.get("schema").asText() + config.get("schema").asText().equals("") ? "default" : config.get("schema").asText() ); } @@ -69,9 +74,11 @@ public ExtendedNameTransformer getNameTransformer() { @Override public JdbcDatabase getDatabase(JsonNode databricksConfig) { return Databases.createJdbcDatabase( - databricksConfig.get("username").asText(), - databricksConfig.has("password") ? databricksConfig.get("password").asText() : null, - databricksConfig.get("jdbc_url").asText(), + "token", + databricksConfig.get("pat").asText(), + String.format("jdbc:spark://%s:443/default;transportMode=http;ssl=1;httpPath=%s", + databricksConfig.get("serverHostname").asText(), + databricksConfig.get("httpPath").asText()), DRIVER_CLASS ); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java index 0c706d0686593..3ae154c4bc1d8 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -14,6 +14,7 @@ import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; import java.sql.Timestamp; import java.util.UUID; import org.slf4j.Logger; @@ -31,15 +32,20 @@ public class DatabricksStreamCopier implements StreamCopier { private final AmazonS3 s3Client; private final S3Config s3Config; private final String tmpTableName; + private final DestinationSyncMode syncMode; private final AirbyteStream stream; private final JdbcDatabase db; + private final String database; + private final String streamName; private final ExtendedNameTransformer nameTransformer; - private final SqlOperations sqlOperations; + private final DatabricksSqlOperations sqlOperations; private final S3ParquetWriter parquetWriter; public DatabricksStreamCopier(String stagingFolder, + DestinationSyncMode syncMode, String schema, ConfiguredAirbyteStream configuredStream, + String streamName, AmazonS3 s3Client, JdbcDatabase db, S3Config s3Config, @@ -48,14 +54,18 @@ public DatabricksStreamCopier(String stagingFolder, S3WriterFactory writerFactory, Timestamp uploadTime) throws Exception { this.stream = configuredStream.getStream(); + this.syncMode = syncMode; this.db = db; + this.database = schema; + this.streamName = streamName; this.nameTransformer = nameTransformer; - this.sqlOperations = sqlOperations; - this.tmpTableName = nameTransformer.getTmpTableName(stream.getName()); + this.sqlOperations = (DatabricksSqlOperations) sqlOperations; + this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.s3Client = s3Client; this.s3Config = s3Config; this.parquetWriter = (S3ParquetWriter) writerFactory .create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime); + LOGGER.info(parquetWriter.parquetSchema.toString()); } @Override @@ -69,28 +79,46 @@ public void closeStagingUploader(boolean hasFailed) throws Exception { } @Override - public void createTemporaryTable() throws Exception { - + public void createDestinationSchema() throws Exception { + LOGGER.info("Creating database in destination if it doesn't exist: {}", database); + sqlOperations.createSchemaIfNotExists(db, database); } @Override - public void copyStagingFileToTemporaryTable() throws Exception { - + public void createTemporaryTable() throws Exception { + LOGGER.info("Preparing tmp table in destination for stream: {}, database: {}, tmp table name: {}.", streamName, database, tmpTableName); + LOGGER.info(parquetWriter.parquetSchema.toString()); + sqlOperations.createTableIfNotExists(db, database, tmpTableName); } @Override - public void createDestinationSchema() throws Exception { - + public void copyStagingFileToTemporaryTable() throws Exception { + LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, database: {}, .", tmpTableName, streamName, database); + // TODO: load data sql operation + LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); } + @Override public String createDestinationTable() throws Exception { - return null; + var destTableName = nameTransformer.getRawTableName(streamName); + LOGGER.info("Preparing table {} in destination.", destTableName); + sqlOperations.createTableIfNotExists(db, database, destTableName); + LOGGER.info("Table {} in destination prepared.", tmpTableName); + + return destTableName; } @Override - public String generateMergeStatement(String destTableName) throws Exception { - return null; + public String generateMergeStatement(String destTableName) { + LOGGER.info("Preparing to merge tmp table {} to dest table: {}, database: {}, in destination.", tmpTableName, destTableName, database); + var queries = new StringBuilder(); + if (syncMode.equals(DestinationSyncMode.OVERWRITE)) { + queries.append(sqlOperations.truncateTableQuery(db, database, destTableName)); + LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, database: {}, truncated.", destTableName, database); + } + queries.append(sqlOperations.copyTableQuery(db, database, tmpTableName, destTableName)); + return queries.toString(); } @Override diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java index 3f73f5f0e7efe..096e134bd2233 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java @@ -13,6 +13,7 @@ import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; import java.sql.Timestamp; public class DatabricksStreamCopierFactory implements StreamCopierFactory { @@ -27,13 +28,14 @@ public StreamCopier create(String configuredSchema, SqlOperations sqlOperations) { try { AirbyteStream stream = configuredStream.getStream(); + DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer); AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config); S3WriterFactory writerFactory = new ProductionWriterFactory(); Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); return new DatabricksStreamCopier( - stagingFolder, schema, configuredStream, s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); + stagingFolder, syncMode, schema, configuredStream, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json index f5264cb4312c5..f9a10ca18b85c 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json @@ -9,22 +9,93 @@ "title": "Databricks Destination Spec", "type": "object", "required": [ - "jdbcUrl" + "serverHostname", + "httpPath", + "pat" ], "additionalProperties": false, "properties": { - "jdbcUrl": { - "title": "JDBC URL", + "serverHostname": { + "title": "Server Hostname", + "type": "string", + "description": "", + "examples": [""] + }, + "httpPath": { + "title": "HTTP Path", + "type": "string", + "description": "", + "examples": [""] + }, + "pat": { + "title": "Personal Access Token", "type": "string", "description": "", "examples": [""], "airbyte_secret": true }, - "database": { + "schema": { "title": "Database", "type": "string", - "description": "", - "examples": [""] + "description": "" + }, + "s3_bucket_name": { + "title": "S3 Bucket Name", + "type": "string", + "description": "The name of the S3 bucket to use for intermittent staging of the data.", + "examples": ["airbyte.staging"] + }, + "s3_bucket_region": { + "title": "S3 Bucket Region", + "type": "string", + "default": "", + "description": "The region of the S3 staging bucket to use if utilising a copy strategy.", + "enum": [ + "", + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "cn-north-1", + "cn-northwest-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "me-south-1" + ] + }, + "access_key_id": { + "type": "string", + "description": "The Access Key Id granting allow one to access the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket.", + "title": "S3 Key Id", + "airbyte_secret": true + }, + "secret_access_key": { + "type": "string", + "description": "The corresponding secret to the above access key id.", + "title": "S3 Access Key", + "airbyte_secret": true + }, + "part_size": { + "type": "integer", + "minimum": 10, + "maximum": 100, + "examples": ["10"], + "description": "Optional. Increase this if syncing tables larger than 100GB. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.", + "title": "Stream Part Size" } } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java index 806852411c920..d64b2362c2fb7 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java @@ -56,6 +56,7 @@ public class S3ParquetWriter extends BaseS3Writer implements S3Writer { private final ParquetWriter parquetWriter; private final AvroRecordFactory avroRecordFactory; + public final Schema parquetSchema; public S3ParquetWriter(S3DestinationConfig config, AmazonS3 s3Client, @@ -88,6 +89,7 @@ public S3ParquetWriter(S3DestinationConfig config, .withDictionaryEncoding(formatConfig.isDictionaryEncoding()) .build(); this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater); + this.parquetSchema = schema; } public static Configuration getHadoopConfig(S3DestinationConfig config) { From e6aa16a6b8bc149f30e7022b8b7da2615f70ca5a Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 2 Sep 2021 15:27:05 -0700 Subject: [PATCH 6/6] Remove connector definition json --- .../9efb1f7c-c3ef-414b-8776-fa12c5df70d9.json | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9efb1f7c-c3ef-414b-8776-fa12c5df70d9.json diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9efb1f7c-c3ef-414b-8776-fa12c5df70d9.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9efb1f7c-c3ef-414b-8776-fa12c5df70d9.json deleted file mode 100644 index 0754399bd19b9..0000000000000 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9efb1f7c-c3ef-414b-8776-fa12c5df70d9.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "destinationDefinitionId": "9efb1f7c-c3ef-414b-8776-fa12c5df70d9", - "name": "Databricks", - "dockerRepository": "airbyte/destination-databricks", - "dockerImageTag": "0.1.0", - "documentationUrl": "https://docs.airbyte.io/integrations/destinations/databricks" -}