Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

12708: Add an option to use encryption with staging in Redshift Destination #13675

Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.40
dockerImageTag: 0.3.41
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3622,7 +3622,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.40"
- dockerImage: "airbyte/destination-redshift:0.3.41"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -3773,6 +3773,33 @@
\ the sync. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=the%20root%20directory.-,Purge%20Staging%20Data,-Whether%20to%20delete\"\
> docs</a> for details."
default: true
encryption:
title: "Encryption"
description: "How to encrypt the staging data"
oneOf:
- title: "No encryption"
description: "Staging data will be stored in plaintext."
type: "object"
required:
"encryption_type"
properties:
encryption_type:
type: "string"
const: "none"
- title: "AES-CBC envelope encryption",
description: "Staging data will be encrypted using AES-CBC envelope encryption."
type: "object"
required:
"encryption_type"
properties:
encryption_type:
type: "string"
const: "aes_cbc_envelope"
key_encrypting_key:
type: "string"
title: "Key"
description: "The key, base64-encoded. Must be either 128, 192, or 256 bits. Leave blank to have Airbyte generate an ephemeral key for each sync.",
airbyte_secret: true
supportsIncremental: true
supportsNormalization: true
supportsDBT: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.40
LABEL io.airbyte.version=0.3.41
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType;
import io.airbyte.integrations.destination.s3.EncryptionConfig;
import io.airbyte.integrations.destination.s3.NoEncryption;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Map;
Expand All @@ -47,14 +52,26 @@ public RedshiftStagingS3Destination() {
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations());
}

private boolean isEphemeralKeysAndPurgingStagingData(JsonNode config, EncryptionConfig encryptionConfig) {
return !isPurgeStagingData(config) && encryptionConfig instanceof AesCbcEnvelopeEncryption c && c.keyType() == KeyType.EPHEMERAL;
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
final EncryptionConfig encryptionConfig = config.has("uploading_method") ?
EncryptionConfig.fromJson(config.get("uploading_method").get("encryption")) : new NoEncryption();
if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage(
"You cannot use ephemeral keys and disable purging your staging data. This would produce S3 objects that you cannot decrypt.");
}
S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, "");

final NamingConventionTransformer nameTransformer = getNamingResolver();
final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations =
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config);
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, encryptionConfig);
final DataSource dataSource = getDataSource(config);
try {
final JdbcDatabase database = new DefaultJdbcDatabase(dataSource);
Expand Down Expand Up @@ -108,10 +125,12 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
final EncryptionConfig encryptionConfig = config.has("uploading_method") ?
EncryptionConfig.fromJson(config.get("uploading_method").get("encryption")) : new NoEncryption();
return new StagingConsumerFactory().create(
outputRecordCollector,
getDatabase(getDataSource(config)),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)),
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.redshift.manifest.Entry;
import io.airbyte.integrations.destination.redshift.manifest.Manifest;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryptionBlobDecorator;
import io.airbyte.integrations.destination.s3.EncryptionConfig;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig;
import io.airbyte.integrations.destination.staging.StagingOperations;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,18 +31,27 @@

public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations {

private static final Encoder BASE64_ENCODER = Base64.getEncoder();
private final NamingConventionTransformer nameTransformer;
private final S3StorageOperations s3StorageOperations;
private final S3DestinationConfig s3Config;
private final ObjectMapper objectMapper;
private final byte[] keyEncryptingKey;

public RedshiftS3StagingSqlOperations(NamingConventionTransformer nameTransformer,
AmazonS3 s3Client,
S3DestinationConfig s3Config) {
S3DestinationConfig s3Config,
final EncryptionConfig encryptionConfig) {
this.nameTransformer = nameTransformer;
this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config);
this.s3Config = s3Config;
this.objectMapper = new ObjectMapper();
if (encryptionConfig instanceof AesCbcEnvelopeEncryption e) {
this.s3StorageOperations.addBlobDecorator(new AesCbcEnvelopeEncryptionBlobDecorator(e.key()));
this.keyEncryptingKey = e.key();
} else {
this.keyEncryptingKey = null;
}
}

@Override
Expand Down Expand Up @@ -99,10 +113,18 @@ public void copyIntoTmpTableFromStage(JdbcDatabase database,

private void executeCopy(final String manifestPath, JdbcDatabase db, String schemaName, String tmpTableName) {
final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig();
final String encryptionClause;
if (keyEncryptingKey == null) {
encryptionClause = "";
} else {
encryptionClause = String.format(" encryption = (type = 'aws_cse' master_key = '%s')", BASE64_ENCODER.encodeToString(keyEncryptingKey));
}

final var copyQuery = String.format(
"""
COPY %s.%s FROM '%s'
CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s'
%s
CSV GZIP
REGION '%s' TIMEFORMAT 'auto'
STATUPDATE OFF
Expand All @@ -112,6 +134,7 @@ private void executeCopy(final String manifestPath, JdbcDatabase db, String sche
getFullS3Path(s3Config.getBucketName(), manifestPath),
credentialConfig.getAccessKeyId(),
credentialConfig.getSecretAccessKey(),
encryptionClause,
s3Config.getBucketRegion());

Exceptions.toRuntime(() -> db.execute(copyQuery));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,49 @@
"type": "boolean",
"description": "Whether to delete the staging files from S3 after completing the sync. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=the%20root%20directory.-,Purge%20Staging%20Data,-Whether%20to%20delete\"> docs</a> for details.",
"default": true
},
"encryption": {
"title": "Encryption",
"type": "object",
"description": "How to encrypt the staging data",
"default": { "encryption_type": "none" },
"order": 7,
"oneOf": [
{
"title": "No encryption",
"description": "Staging data will be stored in plaintext.",
"type": "object",
"required": ["encryption_type"],
"properties": {
"encryption_type": {
"type": "string",
"const": "none",
"enum": ["none"],
"default": "none"
}
}
},
{
"title": "AES-CBC envelope encryption",
"description": "Staging data will be encrypted using AES-CBC envelope encryption.",
"type": "object",
"required": ["encryption_type"],
"properties": {
"encryption_type": {
"type": "string",
"const": "aes_cbc_envelope",
"enum": ["aes_cbc_envelope"],
"default": "aes_cbc_envelope"
},
"key_encrypting_key": {
"type": "string",
"title": "Key",
"description": "The key, base64-encoded. Must be either 128, 192, or 256 bits. Leave blank to have Airbyte generate an ephemeral key for each sync.",
"airbyte_secret": true
}
}
}
]
}
}
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:------------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.41 | 2022-06-21 | [\#13675(https://github.com/airbytehq/airbyte/pull/13675) | Add an option to use encryption with staging in Redshift Destination |
| 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager |
| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method. <br /> **PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. |
| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. |
Expand Down