From cd7e6bddc363e86fc6beb16aead4b0c2eaef5d98 Mon Sep 17 00:00:00 2001 From: Yuri Silva <121974740+yurimssilva@users.noreply.github.com> Date: Tue, 10 Oct 2023 10:06:46 +0100 Subject: [PATCH] feat: adds transfer multiple S3 objects functionality (#104) * feat: adds transfer multiple S3 objects functionality * fix: docs, exception handling and headers * fix: imports * fix: kept license header history and exception handling * fix: license header history format --- .../eclipse/edc/aws/s3/S3BucketSchema.java | 4 +- .../data-plane/data-plane-aws-s3/README.md | 25 ++++- .../dataplane/aws/s3/S3DataSink.java | 4 +- .../dataplane/aws/s3/S3DataSinkFactory.java | 2 +- .../dataplane/aws/s3/S3DataSource.java | 61 +++++++++++- .../dataplane/aws/s3/S3DataSourceFactory.java | 12 ++- .../dataplane/aws/s3/S3DataSourceTest.java | 99 +++++++++++++++++++ 7 files changed, 196 insertions(+), 11 deletions(-) create mode 100644 extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java diff --git a/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java b/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java index 57da8104..ce919d23 100644 --- a/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java +++ b/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Microsoft Corporation + * Copyright (c) 2020 - 2023 Microsoft Corporation * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -9,6 +9,7 @@ * * Contributors: * Microsoft Corporation - initial API and implementation + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * */ @@ -18,6 +19,7 @@ public interface S3BucketSchema { String TYPE = "AmazonS3"; String REGION = "region"; String BUCKET_NAME = "bucketName"; + String KEY_PREFIX = "keyPrefix"; String ACCESS_KEY_ID = "accessKeyId"; String SECRET_ACCESS_KEY = "secretAccessKey"; String ENDPOINT_OVERRIDE = "endpointOverride"; diff --git a/extensions/data-plane/data-plane-aws-s3/README.md b/extensions/data-plane/data-plane-aws-s3/README.md index 194beb4a..07daf0f7 100644 --- a/extensions/data-plane/data-plane-aws-s3/README.md +++ b/extensions/data-plane/data-plane-aws-s3/README.md @@ -4,4 +4,27 @@ This module contains a Data Plane extension to copy data to and from Aws S3. -When used as a source, it currently only supports copying a single object. \ No newline at end of file +When as a source, it supports copying a single or multiple objects. + +#### AmazonS3 DataAddress Configuration + +The behavior of object transfers can be customized using `DataAddress` properties. + +- When `keyPrefix` is present, transfer all objects with keys that start with the specified prefix. +- When `keyPrefix` is not present, transfer only the object with a key matching the `keyName` property. +- Precedence: `keyPrefix` takes precedence over `keyName` when determining which objects to transfer. It allows for both multiple object transfers and fetching a single object when necessary. + +>Note: Using `keyPrefix` introduces an additional step to list all objects whose keys match the specified prefix. + +#### Example Usage: + +Configuration with `keyPrefix`: + +```json +{ + "dataAddress": { + "keyName": "my-key-name", + "keyPrefix": "my-key-prefix/" + } +} +``` \ No newline at end of file diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java index 7e283a76..36a41415 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2022 - 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -52,7 +52,7 @@ protected StreamResult transferParts(List parts) { var uploadId = client.createMultipartUpload(CreateMultipartUploadRequest.builder() .bucket(bucketName) - .key(keyName) + .key(part.name()) .build()).uploadId(); while (true) { diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java index a3d4d7ec..febc0b85 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2022 - 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java index b6bf5320..3547ade1 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2022 - 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -15,20 +15,28 @@ package org.eclipse.edc.connector.dataplane.aws.s3; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure; import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.S3Object; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import java.util.stream.Stream; +import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR; +import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.failure; import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.success; class S3DataSource implements DataSource { private String bucketName; private String keyName; + private String keyPrefix; private S3Client client; private S3DataSource() { @@ -36,9 +44,55 @@ private S3DataSource() { @Override public StreamResult> openPartStream() { + + if (keyPrefix != null) { + + var s3Objects = this.fetchPrefixedS3Objects(); + + if (s3Objects.isEmpty()) { + return failure(new StreamFailure(List.of("Error listing S3 objects in the bucket: Object not found"), GENERAL_ERROR)); + } + + var s3PartStream = s3Objects.stream() + .map(S3Object::key) + .map(key -> (Part) new S3Part(client, key, bucketName)); + + return success(s3PartStream); + + } + return success(Stream.of(new S3Part(client, keyName, bucketName))); } + /** + * Fetches S3 objects with the specified prefix. + * + * @return A list of S3 objects. + */ + private List fetchPrefixedS3Objects() { + + String continuationToken = null; + List s3Objects = new ArrayList<>(); + + do { + + var listObjectsRequest = ListObjectsV2Request.builder() + .bucket(bucketName) + .prefix(keyPrefix) + .continuationToken(continuationToken) + .build(); + + var response = client.listObjectsV2(listObjectsRequest); + + s3Objects.addAll(response.contents()); + + continuationToken = response.nextContinuationToken(); + + } while (continuationToken != null); + + return s3Objects; + } + @Override public void close() throws Exception { client.close(); @@ -94,6 +148,11 @@ public Builder keyName(String keyName) { return this; } + public Builder keyPrefix(String keyPrefix) { + source.keyPrefix = keyPrefix; + return this; + } + public Builder client(S3Client client) { source.client = client; return this; diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java index fdbec07d..12d80f93 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -36,6 +36,7 @@ import static org.eclipse.edc.aws.s3.S3BucketSchema.ACCESS_KEY_ID; import static org.eclipse.edc.aws.s3.S3BucketSchema.BUCKET_NAME; import static org.eclipse.edc.aws.s3.S3BucketSchema.ENDPOINT_OVERRIDE; +import static org.eclipse.edc.aws.s3.S3BucketSchema.KEY_PREFIX; import static org.eclipse.edc.aws.s3.S3BucketSchema.REGION; import static org.eclipse.edc.aws.s3.S3BucketSchema.SECRET_ACCESS_KEY; @@ -75,10 +76,11 @@ public DataSource createSource(DataFlowRequest request) { var source = request.getSourceDataAddress(); return S3DataSource.Builder.newInstance() - .bucketName(source.getStringProperty(BUCKET_NAME)) - .keyName(source.getKeyName()) - .client(getS3Client(source)) - .build(); + .bucketName(source.getStringProperty(BUCKET_NAME)) + .keyName(source.getKeyName()) + .keyPrefix(source.getStringProperty(KEY_PREFIX)) + .client(getS3Client(source)) + .build(); } private S3Client getS3Client(DataAddress address) { diff --git a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java new file mode 100644 index 00000000..b51ae9d9 --- /dev/null +++ b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + */ + +package org.eclipse.edc.connector.dataplane.aws.s3; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class S3DataSourceTest { + + private static final String BUCKET_NAME = "bucketName"; + private static final String KEY_NAME = "object-1"; + private static final String KEY_PREFIX = "my-prefix/"; + private final S3Client s3ClientMock = mock(S3Client.class); + + @Test + void should_select_prefixed_objects_case_key_prefix_is_present() { + + var mockResponse = ListObjectsV2Response.builder().contents( + S3Object.builder().key("my-prefix/object-1").build(), + S3Object.builder().key("my-prefix/object-2").build() + ).build(); + + var s3Datasource = S3DataSource.Builder.newInstance() + .bucketName(BUCKET_NAME) + .keyName(KEY_NAME) + .keyPrefix(KEY_PREFIX) + .client(s3ClientMock) + .build(); + + when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse); + + var result = s3Datasource.openPartStream(); + + assertThat(result.succeeded()).isTrue(); + verify(s3ClientMock, atLeastOnce()).listObjectsV2(any(ListObjectsV2Request.class)); + assertThat(result.getContent()).hasSize(2); + } + + @Test + void should_fail_case_no_object_is_found() { + + var mockResponse = ListObjectsV2Response.builder().build(); + + var s3Datasource = S3DataSource.Builder.newInstance() + .bucketName(BUCKET_NAME) + .keyName(KEY_NAME) + .keyPrefix(KEY_PREFIX) + .client(s3ClientMock) + .build(); + + when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse); + + var result = s3Datasource.openPartStream(); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().getFailureDetail()).isEqualTo("Error listing S3 objects in the bucket: Object not found"); + } + + @Test + void should_select_single_object_case_key_prefix_is_not_present() { + + var s3Datasource = S3DataSource.Builder.newInstance() + .bucketName(BUCKET_NAME) + .keyName(KEY_NAME) + .keyPrefix(null) + .client(s3ClientMock) + .build(); + + var result = s3Datasource.openPartStream(); + + assertThat(result.succeeded()).isTrue(); + verify(s3ClientMock, never()).listObjectsV2(any(ListObjectsV2Request.class)); + assertThat(result.getContent()).hasSize(1); + } + +}