Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination-snowflake: start using new S3StreamCopier, and expose the purgeStagingData option #9531

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public abstract class S3StreamCopier implements StreamCopier {
private final int maxPartsPerFile;
// The number of batches inserted into the current file.
private int partsAddedToCurrentFile;
private String currentFile;
protected String currentFile;

/**
* @param maxPartsPerFile The number of "chunks" of requests to add into each file. Each chunk can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
Put the contents of the `Snowflake Integration Test Config` secret on Rippling under the `Engineering` folder into `secrets/config.json` to be able to run integration tests locally.

1. Put the contents of the `destination snowflake - insert test creds` LastPass secret into `secrets/insert_config.json`.
1. Put the contents of the `destination snowflake - insert staging test creds` secret into `insert_staging_config.json`.
1. Put the contents of the `destination snowflake - insert staging test creds` secret into `internal_staging_config.json`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be secrets/internal_staging_config?

Copy link
Contributor Author

@etsybaev etsybaev Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have no idea :) I just aligned documentation to what we actually have in codebase and secret manager

1. Put the contents of the `destination snowflake - gcs copy test creds` secret into `secrets/copy_gcs_config.json`
1. Put the contents of the `destination snowflake - s3 copy test creds` secret into `secrets/copy_s3_config.json`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteMessage;
Expand All @@ -28,7 +29,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
getDatabase(config),
getSqlOperations(),
getNameTransformer(),
getS3DestinationConfig(config),
new S3CopyConfig(S3CopyConfig.shouldPurgeStagingData(config), getS3DestinationConfig(config)),
etsybaev marked this conversation as resolved.
Show resolved Hide resolved
catalog,
new SnowflakeS3StreamCopierFactory(),
getConfiguredSchema(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,65 @@
package io.airbyte.integrations.destination.snowflake;

import com.amazonaws.services.s3.AmazonS3;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.s3.LegacyS3StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;

public class SnowflakeS3StreamCopier extends LegacyS3StreamCopier {
public class SnowflakeS3StreamCopier extends S3StreamCopier {

// From https://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html
// "Split your load data files so that the files are about equal size, between 1 MB and 1 GB after
// compression"
public static final int MAX_PARTS_PER_FILE = 4;

public SnowflakeS3StreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final S3CopyConfig config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
super(stagingFolder, destSyncMode, schema, streamName, client, db, s3Config, nameTransformer, sqlOperations);
final SqlOperations sqlOperations,
final ConfiguredAirbyteStream configuredAirbyteStream) {
this(
stagingFolder,
schema,
client,
db,
config,
nameTransformer,
sqlOperations,
Timestamp.from(Instant.now()),
configuredAirbyteStream);
}

@VisibleForTesting
SnowflakeS3StreamCopier(final String stagingFolder,
final String schema,
final AmazonS3 client,
final JdbcDatabase db,
final S3CopyConfig config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations,
final Timestamp uploadTime,
final ConfiguredAirbyteStream configuredAirbyteStream) {
super(stagingFolder,
schema,
client,
db,
config,
nameTransformer,
sqlOperations,
configuredAirbyteStream,
uploadTime,
MAX_PARTS_PER_FILE);
}

@Override
Expand All @@ -47,4 +86,14 @@ public void copyS3CsvFileIntoTable(final JdbcDatabase database,
database.execute(copyQuery);
}

@VisibleForTesting
String getTmpTableName() {
edgao marked this conversation as resolved.
Show resolved Hide resolved
return tmpTableName;
}

@VisibleForTesting
String getCurrentFile() {
return currentFile;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,23 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.s3.LegacyS3StreamCopierFactory;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;

public class SnowflakeS3StreamCopierFactory extends LegacyS3StreamCopierFactory {
public class SnowflakeS3StreamCopierFactory extends S3StreamCopierFactory {

@Override
public StreamCopier create(final String stagingFolder,
final DestinationSyncMode syncMode,
final String schema,
final String streamName,
final AmazonS3 s3Client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations)
protected StreamCopier create(String stagingFolder,
String schema,
etsybaev marked this conversation as resolved.
Show resolved Hide resolved
AmazonS3 s3Client,
JdbcDatabase db,
S3CopyConfig config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations,
ConfiguredAirbyteStream configuredStream)
throws Exception {
return new SnowflakeS3StreamCopier(stagingFolder, syncMode, schema, streamName, s3Client, db, s3Config, nameTransformer, sqlOperations);
return new SnowflakeS3StreamCopier(stagingFolder, schema, s3Client, db, config, nameTransformer, sqlOperations, configuredStream);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@
"description": "Optional. Increase this if syncing tables larger than 100GB. Only relevant for COPY. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.",
"title": "Stream Part Size",
"order": 5
},
"purge_staging_data": {
"title": "Purge Staging Files and Tables",
"type": "boolean",
"description": "Whether to delete the staging files from S3 after completing the sync. See the docs for details. Only relevant for COPY. Defaults to true.",
"default": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

import static io.airbyte.integrations.destination.snowflake.SnowflakeS3StreamCopier.MAX_PARTS_PER_FILE;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import com.amazonaws.services.s3.AmazonS3Client;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.sql.Timestamp;
import java.time.Instant;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class SnowflakeS3StreamCopierTest {

private static final int PART_SIZE = 5;

// equivalent to Thu, 09 Dec 2021 19:17:54 GMT
private static final Timestamp UPLOAD_TIME = Timestamp.from(Instant.ofEpochMilli(1639077474000L));

private AmazonS3Client s3Client;
private JdbcDatabase db;
private SqlOperations sqlOperations;
private SnowflakeS3StreamCopier copier;

@BeforeEach
public void setup() {
s3Client = mock(AmazonS3Client.class, RETURNS_DEEP_STUBS);
db = mock(JdbcDatabase.class);
sqlOperations = mock(SqlOperations.class);

copier = new SnowflakeS3StreamCopier(
// In reality, this is normally a UUID - see CopyConsumerFactory#createWriteConfigs
"fake-staging-folder",
"fake-schema",
s3Client,
db,
new S3CopyConfig(
true,
new S3DestinationConfig(
"fake-endpoint",
"fake-bucket",
"fake-bucketPath",
"fake-region",
"fake-access-key-id",
"fake-secret-access-key",
PART_SIZE,
null)),
new ExtendedNameTransformer(),
sqlOperations,
UPLOAD_TIME,
new ConfiguredAirbyteStream()
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(new AirbyteStream()
.withName("fake-stream")
.withNamespace("fake-namespace")));
}

@Test
public void copiesCorrectFilesToTable() throws Exception {
// Generate two files
for (int i = 0; i < MAX_PARTS_PER_FILE + 1; i++) {
copier.prepareStagingFile();
}

copier.copyStagingFileToTemporaryTable();

verify(db).execute(String.format("COPY INTO fake-schema.%s FROM "
edgao marked this conversation as resolved.
Show resolved Hide resolved
+ "'s3://fake-bucket/%s'"
+ " CREDENTIALS=(aws_key_id='fake-access-key-id' aws_secret_key='fake-secret-access-key') "
+ "file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"');",
copier.getTmpTableName(), copier.getCurrentFile()));
}

}