Skip to content

Commit

Permalink
🎉 Destination-gcs handling v1 data protocol (#20635)
Browse files Browse the repository at this point in the history
* Update destination-gcs to handle v1 data protocol

* Review changes
  • Loading branch information
suhomud authored Jan 18, 2023
1 parent daa5bac commit 036c46d
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -96,4 +97,9 @@ protected Map<String, Set<Type>> retrieveDataTypesFromPersistedFiles(final Strin
return resultDataTypes;
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.JsonSchemaType;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import io.airbyte.integrations.standardtest.destination.argproviders.NumberDataTypeTestArgumentProvider;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -39,6 +40,11 @@ public GcsAvroParquetDestinationAcceptanceTest(final S3Format s3Format) {
super(s3Format);
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@ParameterizedTest
@ArgumentsSource(NumberDataTypeTestArgumentProvider.class)
public void testNumberDataType(final String catalogFileName, final String messagesFileName) throws Exception {
Expand Down Expand Up @@ -85,7 +91,8 @@ private JsonNode getJsonNode(final AirbyteStream stream, final String name) {
}

private Set<Type> getExpectedSchemaType(final JsonNode fieldDefinition) {
final JsonNode typeProperty = fieldDefinition.get("type");
// The $ref is a migration to V1 data type protocol see well_known_types.yaml
final JsonNode typeProperty = fieldDefinition.get("type") == null ? fieldDefinition.get("$ref") : fieldDefinition.get("type");
final JsonNode airbyteTypeProperty = fieldDefinition.get("airbyte_type");
final String airbyteTypePropertyText = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText();
return Arrays.stream(JsonSchemaType.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@

package io.airbyte.integrations.destination.gcs;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.nio.charset.StandardCharsets;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Base64;

public class GcsAvroTestDataComparator extends AdvancedTestDataComparator {

@Override
protected boolean compareDateValues(String expectedValue, String actualValue) {
var destinationDate = LocalDate.ofEpochDay(Long.parseLong(actualValue));
var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT));
LocalDate destinationDate = LocalDate.ofEpochDay(Long.parseLong(actualValue));
LocalDate expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT));
return expectedDate.equals(destinationDate);
}

Expand All @@ -28,9 +31,28 @@ protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {

@Override
protected boolean compareDateTimeValues(String airbyteMessageValue, String destinationValue) {
var format = DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT);
DateTimeFormatter format = DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT);
LocalDateTime dateTime = LocalDateTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC);
return super.compareDateTimeValues(airbyteMessageValue, format.format(dateTime));
}

@Override
protected boolean compareTime(final String airbyteMessageValue, final String destinationValue) {
LocalTime destinationDate = LocalTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC);
LocalTime expectedDate = LocalTime.parse(airbyteMessageValue, DateTimeFormatter.ISO_TIME);
return expectedDate.equals(destinationDate);
}

@Override
protected boolean compareString(final JsonNode expectedValue, final JsonNode actualValue) {
// to handle base64 encoded strings
return expectedValue.asText().equals(actualValue.asText())
|| decodeBase64(expectedValue.asText()).equals(actualValue.asText());
}

private String decodeBase64(String string) {
byte[] decoded = Base64.getDecoder().decode(string);
return new String(decoded, StandardCharsets.UTF_8);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
Expand All @@ -33,6 +34,11 @@ public GcsCsvDestinationAcceptanceTest() {
super(S3Format.CSV);
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected JsonNode getFormatConfig() {
return Jsons.jsonNode(Map.of(
Expand All @@ -50,7 +56,9 @@ private static Map<String, String> getFieldTypes(final JsonNode streamSchema) {
final Iterator<Entry<String, JsonNode>> iterator = fieldDefinitions.fields();
while (iterator.hasNext()) {
final Map.Entry<String, JsonNode> entry = iterator.next();
fieldTypes.put(entry.getKey(), entry.getValue().get("type").asText());
JsonNode fieldValue = entry.getValue();
JsonNode typeValue = fieldValue.get("type") == null ? fieldValue.get("$ref") : fieldValue.get("type");
fieldTypes.put(entry.getKey(), typeValue.asText());
}
return fieldTypes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
Expand All @@ -17,6 +18,11 @@

public class GcsCsvGzipDestinationAcceptanceTest extends GcsCsvDestinationAcceptanceTest {

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected JsonNode getFormatConfig() {
// config without compression defaults to GZIP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.integrations.destination.s3.S3FormatConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
Expand Down Expand Up @@ -72,6 +73,11 @@ protected JsonNode getBaseConfigJson() {
return Jsons.deserialize(IOs.readFile(Path.of(SECRET_FILE_PATH)));
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected String getImageName() {
return "airbyte/destination-gcs:dev";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
Expand All @@ -24,6 +25,11 @@ public GcsJsonlDestinationAcceptanceTest() {
super(S3Format.JSONL);
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected JsonNode getFormatConfig() {
return Jsons.jsonNode(Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.amazonaws.services.s3.model.S3Object;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
Expand All @@ -16,6 +17,11 @@

public class GcsJsonlGzipDestinationAcceptanceTest extends GcsJsonlDestinationAcceptanceTest {

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected JsonNode getFormatConfig() {
// config without compression defaults to GZIP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.net.URI;
Expand All @@ -37,6 +38,11 @@ public GcsParquetDestinationAcceptanceTest() {
super(S3Format.PARQUET);
}

@Override
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V1;
}

@Override
protected JsonNode getFormatConfig() {
return Jsons.jsonNode(Map.of(
Expand Down

0 comments on commit 036c46d

Please sign in to comment.