From 273fe86d1ad5b6edfdf0d1dbe8fb761deab4da4c Mon Sep 17 00:00:00 2001 From: Jinni Gu Date: Thu, 2 Sep 2021 15:48:24 -0700 Subject: [PATCH] :tada: New Destination: DynamoDB (#5561) * Added the DynamoDB destination connector. Implemented getConsumer and check methods. Signed-off-by: Jinni Gu * Added auto-generated project files. Signed-off-by: Yiqing Wang * Added config related files and output table helper. Signed-off-by: Yiqing Wang * Added document for DynamoDB destination. Signed-off-by: Jinni Gu * Implemented DynamodbWriter. Added integration tests and unit tests. Signed-off-by: qtz123 * Added DynamoDB in the SUMMARY.md. Signed-off-by: qtz123 * Formatted code using ./gradlew format. Signed-off-by: Jinni Gu * Added changelog to the doc. Signed-off-by: qtz123 * Used PAY_PER_REQUEST instead of provisioned for DynamoDB. Gave the value a name batchSize. Removed unnecessary logs. Signed-off-by: Yiqing Wang Co-authored-by: Yiqing Wang Co-authored-by: qtz123 --- .../8ccd8909-4e99-4141-b48d-4984b70b2d89.json | 7 + .../seed/destination_definitions.yaml | 5 + .../destination-dynamodb/.dockerignore | 3 + .../destination-dynamodb/Dockerfile | 11 ++ .../connectors/destination-dynamodb/README.md | 68 +++++++ .../destination-dynamodb/build.gradle | 21 ++ .../sample_secrets/config.json | 6 + .../destination/dynamodb/DynamodbChecker.java | 100 ++++++++++ .../dynamodb/DynamodbConsumer.java | 133 +++++++++++++ .../dynamodb/DynamodbDestination.java | 69 +++++++ .../dynamodb/DynamodbDestinationConfig.java | 79 ++++++++ .../dynamodb/DynamodbOutputTableHelper.java | 54 +++++ .../destination/dynamodb/DynamodbWriter.java | 185 +++++++++++++++++ .../src/main/resources/spec.json | 82 ++++++++ .../DynamodbDestinationAcceptanceTest.java | 187 ++++++++++++++++++ .../dynamodb/DynamodbDestinationTest.java | 67 +++++++ docs/SUMMARY.md | 1 + docs/integrations/destinations/dynamodb.md | 60 ++++++ 18 files changed, 1138 insertions(+) create mode 100644 airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8ccd8909-4e99-4141-b48d-4984b70b2d89.json create mode 100644 airbyte-integrations/connectors/destination-dynamodb/.dockerignore create mode 100644 airbyte-integrations/connectors/destination-dynamodb/Dockerfile create mode 100644 airbyte-integrations/connectors/destination-dynamodb/README.md create mode 100644 airbyte-integrations/connectors/destination-dynamodb/build.gradle create mode 100644 airbyte-integrations/connectors/destination-dynamodb/sample_secrets/config.json create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/resources/spec.json create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java create mode 100644 docs/integrations/destinations/dynamodb.md diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8ccd8909-4e99-4141-b48d-4984b70b2d89.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8ccd8909-4e99-4141-b48d-4984b70b2d89.json new file mode 100644 index 000000000000..1b2728fb3c10 --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8ccd8909-4e99-4141-b48d-4984b70b2d89.json @@ -0,0 +1,7 @@ +{ + "destinationDefinitionId": "8ccd8909-4e99-4141-b48d-4984b70b2d89", + "name": "DynamoDB", + "dockerRepository": "airbyte/destination-dynamodb", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/dynamodb" +} diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 24797629fe83..545444eb12cf 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -80,6 +80,11 @@ dockerRepository: airbyte/destination-kafka dockerImageTag: 0.1.1 documentationUrl: https://docs.airbyte.io/integrations/destinations/kafka +- destinationDefinitionId: 8ccd8909-4e99-4141-b48d-4984b70b2d89 + name: DynamoDB + dockerRepository: airbyte/destination-dynamodb + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/destinations/dynamodb - destinationDefinitionId: 8aaf41d0-f6d2-46de-9e79-c9540f828142 name: Keen dockerRepository: airbyte/destination-keen diff --git a/airbyte-integrations/connectors/destination-dynamodb/.dockerignore b/airbyte-integrations/connectors/destination-dynamodb/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-dynamodb/Dockerfile b/airbyte-integrations/connectors/destination-dynamodb/Dockerfile new file mode 100644 index 000000000000..319c38ea3133 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-dynamodb + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-dynamodb diff --git a/airbyte-integrations/connectors/destination-dynamodb/README.md b/airbyte-integrations/connectors/destination-dynamodb/README.md new file mode 100644 index 000000000000..3677fb5347ee --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/README.md @@ -0,0 +1,68 @@ +# Destination Dynamodb + +This is the repository for the Dynamodb destination connector in Java. +For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/dynamodb). + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-dynamodb:build +``` + +#### Create credentials +**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. +Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. + +**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-dynamodb:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-dynamodb:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-dynamodb:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-dynamodb:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-dynamodb:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/io/airbyte/integrations/destinations/dynamodb`. + +#### Acceptance Tests +Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in +`src/test-integration/java/io/airbyte/integrations/destinations/dynamodbDestinationAcceptanceTest.java`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-dynamodb:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-dynamodb:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-dynamodb/build.gradle b/airbyte-integrations/connectors/destination-dynamodb/build.gradle new file mode 100644 index 000000000000..b33317f137c7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/build.gradle @@ -0,0 +1,21 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.dynamodb.DynamodbDestination' +} + +dependencies { + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-integrations:connectors:destination-jdbc') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.12.47' + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-dynamodb') +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/sample_secrets/config.json b/airbyte-integrations/connectors/destination-dynamodb/sample_secrets/config.json new file mode 100644 index 000000000000..580e6fad531b --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/sample_secrets/config.json @@ -0,0 +1,6 @@ +{ + "dynamodb_table_name": "paste-table-name-here", + "dynamodb_region": "paste-dynamodb-region-here", + "access_key_id": "paste-access-key-id-here", + "secret_access_key": "paste-secret-access-key-here" +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java new file mode 100644 index 000000000000..8fa49a436704 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java @@ -0,0 +1,100 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.document.*; +import com.amazonaws.services.dynamodbv2.model.*; +import io.airbyte.integrations.base.JavaBaseConstants; +import java.util.Arrays; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamodbChecker { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbChecker.class); + + public static void attemptDynamodbWriteAndDelete(DynamodbDestinationConfig dynamodbDestinationConfig) throws Exception { + var prefix = dynamodbDestinationConfig.getTableName(); + final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); + attemptWriteAndDeleteDynamodbItem(dynamodbDestinationConfig, outputTableName); + } + + private static void attemptWriteAndDeleteDynamodbItem(DynamodbDestinationConfig dynamodbDestinationConfig, String outputTableName) + throws Exception { + DynamoDB dynamoDB = new DynamoDB(getAmazonDynamoDB(dynamodbDestinationConfig)); + Table table = dynamoDB.createTable(outputTableName, // create table + Arrays.asList(new KeySchemaElement(JavaBaseConstants.COLUMN_NAME_AB_ID, KeyType.HASH), new KeySchemaElement("sync_time", KeyType.RANGE)), + Arrays.asList(new AttributeDefinition(JavaBaseConstants.COLUMN_NAME_AB_ID, ScalarAttributeType.S), + new AttributeDefinition("sync_time", ScalarAttributeType.N)), + new ProvisionedThroughput(1L, 1L)); + table.waitForActive(); + + try { + PutItemOutcome outcome = table + .putItem( + new Item().withPrimaryKey(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), "sync_time", System.currentTimeMillis())); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + + table.delete(); // delete table + table.waitForDelete(); + } + + public static AmazonDynamoDB getAmazonDynamoDB(DynamodbDestinationConfig dynamodbDestinationConfig) { + var endpoint = dynamodbDestinationConfig.getEndpoint(); + var region = dynamodbDestinationConfig.getRegion(); + var accessKeyId = dynamodbDestinationConfig.getAccessKeyId(); + var secretAccessKey = dynamodbDestinationConfig.getSecretAccessKey(); + + var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); + + if (endpoint.isEmpty()) { + return AmazonDynamoDBClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(dynamodbDestinationConfig.getRegion()) + .build(); + + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); + + return AmazonDynamoDBClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java new file mode 100644 index 000000000000..40954e770cda --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java @@ -0,0 +1,133 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.protocol.models.*; +import java.util.*; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamodbConsumer extends FailureTrackingAirbyteMessageConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbConsumer.class); + + private final DynamodbDestinationConfig dynamodbDestinationConfig; + private final ConfiguredAirbyteCatalog configuredCatalog; + private final Consumer outputRecordCollector; + private final Map streamNameAndNamespaceToWriters; + + private AirbyteMessage lastStateMessage = null; + + public DynamodbConsumer(DynamodbDestinationConfig dynamodbDestinationConfig, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + this.dynamodbDestinationConfig = dynamodbDestinationConfig; + this.configuredCatalog = configuredCatalog; + this.outputRecordCollector = outputRecordCollector; + this.streamNameAndNamespaceToWriters = new HashMap<>(configuredCatalog.getStreams().size()); + } + + @Override + protected void startTracked() throws Exception { + + var endpoint = dynamodbDestinationConfig.getEndpoint(); + AWSCredentials awsCreds = new BasicAWSCredentials(dynamodbDestinationConfig.getAccessKeyId(), dynamodbDestinationConfig.getSecretAccessKey()); + AmazonDynamoDB amazonDynamodb = null; + + if (endpoint.isEmpty()) { + amazonDynamodb = AmazonDynamoDBClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(dynamodbDestinationConfig.getRegion()) + .build(); + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); + + amazonDynamodb = AmazonDynamoDBClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, dynamodbDestinationConfig.getRegion())) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + + var uploadTimestamp = System.currentTimeMillis(); + + for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { + var writer = new DynamodbWriter(dynamodbDestinationConfig, amazonDynamodb, configuredStream, uploadTimestamp); + + AirbyteStream stream = configuredStream.getStream(); + AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair + .fromAirbyteSteam(stream); + streamNameAndNamespaceToWriters.put(streamNamePair, writer); + } + } + + @Override + protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception { + if (airbyteMessage.getType() == AirbyteMessage.Type.STATE) { + this.lastStateMessage = airbyteMessage; + return; + } else if (airbyteMessage.getType() != AirbyteMessage.Type.RECORD) { + return; + } + + AirbyteRecordMessage recordMessage = airbyteMessage.getRecord(); + AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair + .fromRecordMessage(recordMessage); + + if (!streamNameAndNamespaceToWriters.containsKey(pair)) { + throw new IllegalArgumentException( + String.format( + "Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", + Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); + } + + streamNameAndNamespaceToWriters.get(pair).write(UUID.randomUUID(), recordMessage); + } + + @Override + protected void close(boolean hasFailed) throws Exception { + for (DynamodbWriter handler : streamNameAndNamespaceToWriters.values()) { + handler.close(hasFailed); + } + // DynamoDB stream uploader is all or nothing if a failure happens in the destination. + if (!hasFailed) { + outputRecordCollector.accept(lastStateMessage); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java new file mode 100644 index 000000000000..8e6f087e39fe --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java @@ -0,0 +1,69 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamodbDestination extends BaseConnector implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbDestination.class); + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new DynamodbDestination()).run(args); + } + + @Override + public AirbyteConnectionStatus check(JsonNode config) { + try { + DynamodbChecker.attemptDynamodbWriteAndDelete(DynamodbDestinationConfig.getDynamodbDestinationConfig(config)); + return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); + } catch (Exception e) { + LOGGER.error("Exception attempting to access the DynamoDB table: ", e); + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage("Could not connect to the DynamoDB table with the provided configuration. \n" + e + .getMessage()); + } + } + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + // TODO + return new DynamodbConsumer(DynamodbDestinationConfig.getDynamodbDestinationConfig(config), configuredCatalog, outputRecordCollector); + } + +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java new file mode 100644 index 000000000000..310dc530b0aa --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java @@ -0,0 +1,79 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.fasterxml.jackson.databind.JsonNode; + +public class DynamodbDestinationConfig { + + private final String endpoint; + private final String tableName; + private final String accessKeyId; + private final String secretAccessKey; + private final String region; + + public DynamodbDestinationConfig( + String endpoint, + String tableName, + String region, + String accessKeyId, + String secretAccessKey) { + this.endpoint = endpoint; + this.tableName = tableName; + this.region = region; + this.accessKeyId = accessKeyId; + this.secretAccessKey = secretAccessKey; + } + + public static DynamodbDestinationConfig getDynamodbDestinationConfig(JsonNode config) { + return new DynamodbDestinationConfig( + config.get("dynamodb_endpoint") == null ? "" : config.get("dynamodb_endpoint").asText(), + config.get("dynamodb_table_name").asText(), + config.get("dynamodb_region").asText(), + config.get("access_key_id").asText(), + config.get("secret_access_key").asText()); + } + + public String getEndpoint() { + return endpoint; + } + + public String getAccessKeyId() { + return accessKeyId; + } + + public String getSecretAccessKey() { + return secretAccessKey; + } + + public String getRegion() { + return region; + } + + public String getTableName() { + return tableName; + } + +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java new file mode 100644 index 000000000000..af540c3d4764 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java @@ -0,0 +1,54 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.protocol.models.AirbyteStream; +import java.util.LinkedList; +import java.util.List; + +public class DynamodbOutputTableHelper { + + public static String getOutputTableName(String tableName, AirbyteStream stream) { + return getOutputTableName(tableName, stream.getNamespace(), stream.getName()); + } + + public static String getOutputTableName(String tableName, String namespace, String streamName) { + List paths = new LinkedList<>(); + + if (tableName != null) { + paths.add(tableName); + } + if (namespace != null) { + paths.add(new ExtendedNameTransformer().convertStreamName(namespace)); + } + if (streamName != null) { + paths.add(new ExtendedNameTransformer().convertStreamName(streamName)); + } + + return String.join("_", paths); + } + +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java new file mode 100644 index 000000000000..5805f6ca264b --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java @@ -0,0 +1,185 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.document.*; +import com.amazonaws.services.dynamodbv2.model.*; +import com.amazonaws.services.dynamodbv2.util.TableUtils; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamodbWriter { + + protected static final Logger LOGGER = LoggerFactory.getLogger(DynamodbWriter.class); + + private static final ObjectMapper MAPPER = MoreMappers.initMapper(); + private static final ObjectWriter WRITER = MAPPER.writer(); + + private final DynamodbDestinationConfig config; + private final DynamoDB dynamodb; + private final ConfiguredAirbyteStream configuredStream; + private final long uploadTimestamp; + private TableWriteItems tableWriteItems; + private final String outputTableName; + private final int batchSize = 25; + + public DynamodbWriter(DynamodbDestinationConfig config, + AmazonDynamoDB amazonDynamodb, + ConfiguredAirbyteStream configuredStream, + long uploadTimestamp) { + + this.config = config; + this.dynamodb = new DynamoDB(amazonDynamodb); + this.configuredStream = configuredStream; + this.uploadTimestamp = uploadTimestamp; + this.outputTableName = DynamodbOutputTableHelper.getOutputTableName(config.getTableName(), configuredStream.getStream()); + + final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); + if (syncMode == null) { + throw new IllegalStateException("Undefined destination sync mode"); + } + + final boolean isAppendMode = syncMode != DestinationSyncMode.OVERWRITE; + boolean tableExist = true; + + try { + if (!isAppendMode) { + Table table = dynamodb.getTable(outputTableName); + + if (isTableExist(table)) { + table.delete(); + table.waitForDelete(); + } + } + + var table = createTableIfNotExists(amazonDynamodb, outputTableName); + table.waitForActive(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + + this.tableWriteItems = new TableWriteItems(outputTableName); + } + + private static boolean isTableExist(Table table) { + try { + table.describe(); + } catch (ResourceNotFoundException e) { + return false; + } + return true; + } + + private Table createTableIfNotExists(AmazonDynamoDB amazonDynamodb, String tableName) throws Exception { + AttributeDefinition partitionKeyDefinition = new AttributeDefinition() + .withAttributeName(JavaBaseConstants.COLUMN_NAME_AB_ID) + .withAttributeType(ScalarAttributeType.S); + AttributeDefinition sortKeyDefinition = new AttributeDefinition() + .withAttributeName("sync_time") + .withAttributeType(ScalarAttributeType.N); + KeySchemaElement partitionKeySchema = new KeySchemaElement() + .withAttributeName(JavaBaseConstants.COLUMN_NAME_AB_ID) + .withKeyType(KeyType.HASH); + KeySchemaElement sortKeySchema = new KeySchemaElement() + .withAttributeName("sync_time") + .withKeyType(KeyType.RANGE); + + TableUtils.createTableIfNotExists(amazonDynamodb, new CreateTableRequest() + .withTableName(tableName) + .withAttributeDefinitions(partitionKeyDefinition) + .withKeySchema(partitionKeySchema) + .withAttributeDefinitions(sortKeyDefinition) + .withKeySchema(sortKeySchema) + .withBillingMode(BillingMode.PAY_PER_REQUEST)); + return new DynamoDB(amazonDynamodb).getTable(tableName); + } + + public void write(UUID id, AirbyteRecordMessage recordMessage) { + + ObjectMapper mapper = new ObjectMapper(); + Map dataMap = mapper.convertValue(recordMessage.getData(), new TypeReference>() {}); + + var item = new Item() + .withPrimaryKey(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), "sync_time", uploadTimestamp) + .withMap(JavaBaseConstants.COLUMN_NAME_DATA, dataMap) + .withLong(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()); + tableWriteItems.addItemToPut(item); + BatchWriteItemOutcome outcome; + if (tableWriteItems.getItemsToPut().size() >= batchSize) { + try { + int maxRetries = 5; + outcome = dynamodb.batchWriteItem(tableWriteItems); + tableWriteItems = new TableWriteItems(this.outputTableName); + + while (outcome.getUnprocessedItems().size() > 0 && maxRetries > 0) { + outcome = dynamodb.batchWriteItemUnprocessed(outcome.getUnprocessedItems()); + maxRetries--; + } + + if (maxRetries == 0) { + LOGGER.warn(String.format("Unprocessed items count after retry %d times: %s", 5, Integer.toString(outcome.getUnprocessedItems().size()))); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + } + + public void close(boolean hasFailed) throws IOException { + if (hasFailed) { + LOGGER.warn("Failure in writing data to DynamoDB. Aborting..."); + } else { + try { + int maxRetries = 5; + if (tableWriteItems.getItemsToPut().size() > 0) { + var outcome = dynamodb.batchWriteItem(tableWriteItems); + while (outcome.getUnprocessedItems().size() > 0 && maxRetries > 0) { + outcome = dynamodb.batchWriteItemUnprocessed(outcome.getUnprocessedItems()); + maxRetries--; + } + if (maxRetries == 0) { + LOGGER.warn(String.format("Unprocessed items count after retry %d times: %s", 5, Integer.toString(outcome.getUnprocessedItems().size()))); + } + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + LOGGER.info("Data writing completed for DynamoDB."); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-dynamodb/src/main/resources/spec.json new file mode 100644 index 000000000000..87e9218a7260 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/resources/spec.json @@ -0,0 +1,82 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/dynamodb", + "supportsIncremental": true, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DynamoDB Destination Spec", + "type": "object", + "required": [ + "dynamodb_table_name", + "dynamodb_region", + "access_key_id", + "secret_access_key" + ], + "additionalProperties": false, + "properties": { + "dynamodb_endpoint": { + "title": "Endpoint", + "type": "string", + "default": "", + "description": "This is your DynamoDB endpoint url.(if you are working with AWS DynamoDB, just leave empty).", + "examples": ["http://localhost:9000"] + }, + "dynamodb_table_name": { + "title": "DynamoDB Table Name", + "type": "string", + "description": "The name of the DynamoDB table.", + "examples": ["airbyte_sync"] + }, + "dynamodb_region": { + "title": "DynamoDB Region", + "type": "string", + "default": "", + "description": "The region of the DynamoDB.", + "enum": [ + "", + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "cn-north-1", + "cn-northwest-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "me-south-1", + "us-gov-east-1", + "us-gov-west-1" + ] + }, + "access_key_id": { + "type": "string", + "description": "The access key id to access the DynamoDB. Airbyte requires Read and Write permissions to the DynamoDB.", + "title": "DynamoDB Key Id", + "airbyte_secret": true, + "examples": ["A012345678910EXAMPLE"] + }, + "secret_access_key": { + "type": "string", + "description": "The corresponding secret to the access key id.", + "title": "DynamoDB Access Key", + "airbyte_secret": true, + "examples": ["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"] + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java new file mode 100644 index 000000000000..47b128c174c2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java @@ -0,0 +1,187 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.document.*; +import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.file.Path; +import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamodbDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbDestinationAcceptanceTest.class); + protected static final ObjectMapper MAPPER = MoreMappers.initMapper(); + + protected final String secretFilePath = "secrets/config.json"; + protected JsonNode configJson; + protected DynamodbDestinationConfig config; + protected AmazonDynamoDB client; + + protected JsonNode getBaseConfigJson() { + return Jsons.deserialize(IOs.readFile(Path.of(secretFilePath))); + } + + @Override + protected String getImageName() { + return "airbyte/destination-dynamodb:dev"; + } + + @Override + protected JsonNode getConfig() { + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + JsonNode baseJson = getBaseConfigJson(); + JsonNode failCheckJson = Jsons.clone(baseJson); + // invalid credential + ((ObjectNode) failCheckJson).put("access_key_id", "fake-key"); + ((ObjectNode) failCheckJson).put("secret_access_key", "fake-secret"); + return failCheckJson; + } + + /** + * Helper method to retrieve all synced objects inside the configured bucket path. + */ + protected List getAllSyncedObjects(String streamName, String namespace) { + var dynamodb = new DynamoDB(this.client); + var tableName = DynamodbOutputTableHelper.getOutputTableName(this.config.getTableName(), streamName, namespace); + var table = dynamodb.getTable(tableName); + List items = new ArrayList(); + List resultItems = new ArrayList(); + Long maxSyncTime = 0L; + + try { + ItemCollection scanItems = table.scan(new ScanSpec()); + + Iterator iter = scanItems.iterator(); + while (iter.hasNext()) { + + Item item = iter.next(); + items.add(item); + maxSyncTime = Math.max(maxSyncTime, ((BigDecimal) item.get("sync_time")).longValue()); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + + Long finalMaxSyncTime = maxSyncTime; + items.sort(Comparator.comparingLong(o -> ((BigDecimal) o.get(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)).longValue())); + + return items; + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + List items = getAllSyncedObjects(streamName, namespace); + List jsonRecords = new LinkedList<>(); + + for (var item : items) { + var itemJson = item.toJSON(); + jsonRecords.add(Jsons.deserialize(itemJson).get(JavaBaseConstants.COLUMN_NAME_DATA)); + } + + return jsonRecords; + } + + @Override + protected void setup(TestDestinationEnv testEnv) { + JsonNode baseConfigJson = getBaseConfigJson(); + // Set a random s3 bucket path for each integration test + JsonNode configJson = Jsons.clone(baseConfigJson); + this.configJson = configJson; + this.config = DynamodbDestinationConfig.getDynamodbDestinationConfig(configJson); + + var endpoint = config.getEndpoint(); + var region = config.getRegion(); + var accessKeyId = config.getAccessKeyId(); + var secretAccessKey = config.getSecretAccessKey(); + + var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); + + if (endpoint.isEmpty()) { + this.client = AmazonDynamoDBClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(config.getRegion()) + .build(); + + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); + + this.client = AmazonDynamoDBClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + var dynamodb = new DynamoDB(this.client); + List tables = new ArrayList(); + dynamodb.listTables().forEach(o -> { + if (o.getTableName().startsWith(this.config.getTableName())) + tables.add(o.getTableName()); + }); + + try { + for (var tableName : tables) { + Table table = dynamodb.getTable(tableName); + table.delete(); + table.waitForDelete(); + LOGGER.info(String.format("Delete table %s", tableName)); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java b/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java new file mode 100644 index 000000000000..3e80a03ca9f6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.*; +import org.junit.jupiter.api.Test; + +class DynamodbDestinationTest { + + @Test + void testGetOutputTableNameWithString() throws Exception { + var actual = DynamodbOutputTableHelper.getOutputTableName("test_table", "test_namespace", "test_stream"); + assertEquals("test_table_test_namespace_test_stream", actual); + } + + @Test + void testGetOutputTableNameWithStream() throws Exception { + var stream = new AirbyteStream(); + stream.setName("test_stream"); + stream.setNamespace("test_namespace"); + var actual = DynamodbOutputTableHelper.getOutputTableName("test_table", stream); + assertEquals("test_table_test_namespace_test_stream", actual); + } + + @Test + void testGetDynamodbDestinationdbConfig() throws Exception { + JsonNode json = Jsons.deserialize("{\n" + + " \"dynamodb_table_name\": \"test_table\",\n" + + " \"dynamodb_region\": \"test_region\",\n" + + " \"access_key_id\": \"test_key_id\",\n" + + " \"secret_access_key\": \"test_access_key\"\n" + + "}"); + var config = DynamodbDestinationConfig.getDynamodbDestinationConfig(json); + + assertEquals(config.getTableName(), "test_table"); + assertEquals(config.getRegion(), "test_region"); + assertEquals(config.getAccessKeyId(), "test_key_id"); + assertEquals(config.getSecretAccessKey(), "test_access_key"); + } + +} diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 422e01b8c1e9..2e5855a7e254 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -131,6 +131,7 @@ * [Destinations](integrations/destinations/README.md) * [AzureBlobStorage](integrations/destinations/azureblobstorage.md) * [BigQuery](integrations/destinations/bigquery.md) + * [DynamoDB](integrations/destinations/dynamodb.md) * [Chargify](integrations/destinations/keen.md) * [Google Cloud Storage (GCS)](integrations/destinations/gcs.md) * [Google PubSub](integrations/destinations/pubsub.md) diff --git a/docs/integrations/destinations/dynamodb.md b/docs/integrations/destinations/dynamodb.md new file mode 100644 index 000000000000..dfa230be31f1 --- /dev/null +++ b/docs/integrations/destinations/dynamodb.md @@ -0,0 +1,60 @@ +# Dynamodb + +This destination writes data to AWS DynamoDB. + +The Airbyte DynamoDB destination allows you to sync data to AWS DynamoDB. Each stream is written to its own table under the DynamoDB. + +## Sync overview + +### Output schema + +Each stream will be output into its own DynamoDB table. Each table will a collections of `json` objects containing 4 fields: + +* `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. +* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. +* `_airbyte_data`: a json blob representing with the extracted data. +* `sync_time`: a timestamp representing when the sync up task be triggered. + +### Features + +| Feature | Support | Notes | +| :--- | :---: | :--- | +| Full Refresh Sync | ✅ | Warning: this mode deletes all previously synced data in the configured DynamoDB table. | +| Incremental - Append Sync | ✅ | | +| Namespaces | ✅ | Namespace will be used as part of the table name. | + +### Performance considerations + +This connector by default uses 10 capacity units for both Read and Write in DynamoDB tables. Please provision more capacity units in the DynamoDB console when there are performance constraints. + +## Getting started + +### Requirements + +1. Allow connections from Airbyte server to your AWS DynamoDB tables \(if they exist in separate VPCs\). +2. The credentials for AWS DynamoDB \(for the COPY strategy\). + +### Setup guide + +* Fill up DynamoDB info + * **DynamoDB Endpoint** + * Leave empty if using AWS DynamoDB, fill in endpoint URL if using customized endpoint. + * **DynamoDB Table Name** + * The name prefix of the DynamoDB table to store the extracted data. The table name is \\_\\_\. + * **DynamoDB Region** + * The region of the DynamoDB. + * **Access Key Id** + * See [this](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) on how to generate an access key. + * We recommend creating an Airbyte-specific user. This user will require [read and write permissions](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_examples_dynamodb_specific-table.html) to the DynamoDB table. + * **Secret Access Key** + * Corresponding key to the above key id. +* Make sure your DynamoDB tables are accessible from the machine running Airbyte. + * This depends on your networking setup. + * You can check AWS DynamoDB documentation with a tutorial on how to properly configure your DynamoDB's access [here](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/access-control-overview.html). + * The easiest way to verify if Airbyte is able to connect to your DynamoDB tables is via the check connection tool in the UI. + +## CHANGELOG + +| Version | Date | Pull Request | Subject | +| :--- | :--- | :--- | :--- | +| 0.1.0 | 2021-08-20 | [#5561](https://github.com/airbytehq/airbyte/pull/5561) | Initial release. |