From cde55a348290817c422a72a9f91af622a620941c Mon Sep 17 00:00:00 2001 From: yurisilvapjd Date: Fri, 6 Oct 2023 13:10:37 +0100 Subject: [PATCH 1/5] feat: adds transfer multiple S3 objects functionality --- .../eclipse/edc/aws/s3/S3BucketSchema.java | 2 + .../data-plane/data-plane-aws-s3/README.md | 26 ++++- .../dataplane/aws/s3/S3DataSink.java | 2 +- .../dataplane/aws/s3/S3DataSource.java | 67 ++++++++++++- .../dataplane/aws/s3/S3DataSourceFactory.java | 10 +- .../exception/S3ObjectNotFoundException.java | 22 +++++ .../dataplane/aws/s3/S3DataSourceTest.java | 94 +++++++++++++++++++ 7 files changed, 216 insertions(+), 7 deletions(-) create mode 100644 extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java create mode 100644 extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java diff --git a/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java b/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java index 57da8104..aec5f235 100644 --- a/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java +++ b/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java @@ -9,6 +9,7 @@ * * Contributors: * Microsoft Corporation - initial API and implementation + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * */ @@ -18,6 +19,7 @@ public interface S3BucketSchema { String TYPE = "AmazonS3"; String REGION = "region"; String BUCKET_NAME = "bucketName"; + String KEY_PREFIX = "keyPrefix"; String ACCESS_KEY_ID = "accessKeyId"; String SECRET_ACCESS_KEY = "secretAccessKey"; String ENDPOINT_OVERRIDE = "endpointOverride"; diff --git a/extensions/data-plane/data-plane-aws-s3/README.md b/extensions/data-plane/data-plane-aws-s3/README.md index 194beb4a..5cffec7e 100644 --- a/extensions/data-plane/data-plane-aws-s3/README.md +++ b/extensions/data-plane/data-plane-aws-s3/README.md @@ -4,4 +4,28 @@ This module contains a Data Plane extension to copy data to and from Aws S3. -When used as a source, it currently only supports copying a single object. \ No newline at end of file +When as a source, it supports copying a single or multiple objects. + +#### AmazonS3 DataAddress Configuration + +The behavior of object transfers can be customized using `DataAddress` properties. + +- When `keyPrefix` is present, transfer all objects with keys that start with the specified prefix. + +- When `keyPrefix` is not present, transfer only the object with a key matching the `keyName` property. + +- Precedence: `keyPrefix` takes precedence over `keyName` when determining which objects to transfer. It allows for both multiple object transfers and fetching a single object when necessary. + +>Note: Using `keyPrefix` introduces an additional step to list all objects whose keys match the specified prefix. + +#### Example Usage: + +#### Configuration with `keyPrefix`: + +```json +{ + "dataAddress": { + "keyPrefix": "my-prefix/" + } +} +``` \ No newline at end of file diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java index 92bcefaa..26af3d1a 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java @@ -52,7 +52,7 @@ protected StreamResult transferParts(List parts) { var uploadId = client.createMultipartUpload(CreateMultipartUploadRequest.builder() .bucket(bucketName) - .key(keyName) + .key(part.name()) .build()).uploadId(); while (true) { diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java index 45331305..64a16554 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java @@ -14,13 +14,19 @@ package org.eclipse.edc.connector.dataplane.aws.s3; +import org.eclipse.edc.connector.dataplane.aws.s3.exception.S3ObjectNotFoundException; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import java.util.stream.Stream; import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.success; @@ -29,15 +35,69 @@ class S3DataSource implements DataSource { private String bucketName; private String keyName; + private String keyPrefix; private S3Client client; - private S3DataSource() { } + private S3DataSource() { + } @Override public StreamResult> openPartStream() { + + if (keyPrefix != null) { + try { + + var s3PartStream = this.fetchPrefixedS3Objects().stream() + .map(S3Object::key) + .map(key -> (Part) new S3Part(client, key, bucketName)); + + return success(s3PartStream); + + } catch (S3Exception e) { + throw new RuntimeException("Error listing objects in the bucket: " + e.getMessage()); + } + } + return success(Stream.of(new S3Part(client, keyName, bucketName))); } + /** + * Fetches S3 objects with the specified prefix. + * + * @return A list of S3 objects. + * @throws S3ObjectNotFoundException if no objects are found. + */ + private List fetchPrefixedS3Objects() { + + String continuationToken = null; + List s3Objects = new ArrayList<>(); + + do { + + var listObjectsRequest = ListObjectsV2Request.builder() + .bucket(bucketName) + .prefix(keyPrefix) + .continuationToken(continuationToken) + .build(); + + var response = client.listObjectsV2(listObjectsRequest); + + s3Objects.addAll(response.contents()); + + continuationToken = response.nextContinuationToken(); + + } while (continuationToken != null); + + if (s3Objects.isEmpty()) throw new S3ObjectNotFoundException("Object not found"); + + return s3Objects; + } + + @Override + public void close() { + // no-op + } + private static class S3Part implements Part { private final S3Client client; private final String keyName; @@ -88,6 +148,11 @@ public Builder keyName(String keyName) { return this; } + public Builder keyPrefix(String keyPrefix) { + source.keyPrefix = keyPrefix; + return this; + } + public Builder client(S3Client client) { source.client = client; return this; diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java index fdbec07d..4d57d0b7 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java @@ -38,6 +38,7 @@ import static org.eclipse.edc.aws.s3.S3BucketSchema.ENDPOINT_OVERRIDE; import static org.eclipse.edc.aws.s3.S3BucketSchema.REGION; import static org.eclipse.edc.aws.s3.S3BucketSchema.SECRET_ACCESS_KEY; +import static org.eclipse.edc.aws.s3.S3BucketSchema.KEY_PREFIX; public class S3DataSourceFactory implements DataSourceFactory { @@ -75,10 +76,11 @@ public DataSource createSource(DataFlowRequest request) { var source = request.getSourceDataAddress(); return S3DataSource.Builder.newInstance() - .bucketName(source.getStringProperty(BUCKET_NAME)) - .keyName(source.getKeyName()) - .client(getS3Client(source)) - .build(); + .bucketName(source.getStringProperty(BUCKET_NAME)) + .keyName(source.getKeyName()) + .keyPrefix(source.getStringProperty(KEY_PREFIX)) + .client(getS3Client(source)) + .build(); } private S3Client getS3Client(DataAddress address) { diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java new file mode 100644 index 00000000..e752ff97 --- /dev/null +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + */ + +package org.eclipse.edc.connector.dataplane.aws.s3.exception; + +public class S3ObjectNotFoundException extends RuntimeException { + + public S3ObjectNotFoundException(String message) { + super(message); + } +} diff --git a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java new file mode 100644 index 00000000..7db012fa --- /dev/null +++ b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + */ + +package org.eclipse.edc.connector.dataplane.aws.s3; + +import org.eclipse.edc.connector.dataplane.aws.s3.exception.S3ObjectNotFoundException; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +public class S3DataSourceTest { + + private static final String BUCKET_NAME = "bucketName"; + private static final String KEY_NAME = "object-1"; + private static final String KEY_PREFIX = "my-prefix/"; + private final S3Client s3ClientMock = mock(S3Client.class); + + @Test + void should_select_prefixed_objects_case_key_prefix_is_present() { + + var mockResponse = ListObjectsV2Response.builder().contents( + S3Object.builder().key("my-prefix/object-1").build(), + S3Object.builder().key("my-prefix/object-2").build() + ).build(); + + var s3Datasource = S3DataSource.Builder.newInstance() + .bucketName(BUCKET_NAME) + .keyName(KEY_NAME) + .keyPrefix(KEY_PREFIX) + .client(s3ClientMock) + .build(); + + when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse); + + var content = s3Datasource.openPartStream().getContent(); + + verify(s3ClientMock, atLeastOnce()).listObjectsV2(any(ListObjectsV2Request.class)); + assertThat(content).hasSize(2); + } + + @Test + void should_throw_exception_case_no_object_is_found() { + + var mockResponse = ListObjectsV2Response.builder().build(); + + var s3Datasource = S3DataSource.Builder.newInstance() + .bucketName(BUCKET_NAME) + .keyName(KEY_NAME) + .keyPrefix(KEY_PREFIX) + .client(s3ClientMock) + .build(); + + when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse); + + assertThatThrownBy(s3Datasource::openPartStream) + .isInstanceOf(S3ObjectNotFoundException.class) + .hasMessage("Object not found"); + } + + @Test + void should_select_single_object_case_key_prefix_is_not_present() { + + var s3Datasource = S3DataSource.Builder.newInstance() + .bucketName(BUCKET_NAME) + .keyName(KEY_NAME) + .keyPrefix(null) + .client(s3ClientMock) + .build(); + + var content = s3Datasource.openPartStream().getContent(); + + verify(s3ClientMock, never()).listObjectsV2(any(ListObjectsV2Request.class)); + assertThat(content).hasSize(1); + } + +} From 9843dac21d83c8687a913b4f1b3d4358332d59dc Mon Sep 17 00:00:00 2001 From: yurisilvapjd Date: Mon, 9 Oct 2023 11:54:29 +0100 Subject: [PATCH 2/5] fix: docs, exception handling and headers --- .../eclipse/edc/aws/s3/S3BucketSchema.java | 2 +- .../data-plane/data-plane-aws-s3/README.md | 7 +++--- .../dataplane/aws/s3/S3DataSink.java | 8 +++---- .../dataplane/aws/s3/S3DataSource.java | 22 +++++++++++------- .../dataplane/aws/s3/S3DataSourceFactory.java | 2 +- .../exception/S3ObjectNotFoundException.java | 2 +- .../dataplane/aws/s3/S3DataSourceTest.java | 23 ++++++++++--------- 7 files changed, 36 insertions(+), 30 deletions(-) diff --git a/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java b/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java index aec5f235..38370dcf 100644 --- a/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java +++ b/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Microsoft Corporation + * Copyright (c) 2023 Microsoft Corporation * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at diff --git a/extensions/data-plane/data-plane-aws-s3/README.md b/extensions/data-plane/data-plane-aws-s3/README.md index 5cffec7e..07daf0f7 100644 --- a/extensions/data-plane/data-plane-aws-s3/README.md +++ b/extensions/data-plane/data-plane-aws-s3/README.md @@ -11,21 +11,20 @@ When as a source, it supports copying a single or multiple objects. The behavior of object transfers can be customized using `DataAddress` properties. - When `keyPrefix` is present, transfer all objects with keys that start with the specified prefix. - - When `keyPrefix` is not present, transfer only the object with a key matching the `keyName` property. - - Precedence: `keyPrefix` takes precedence over `keyName` when determining which objects to transfer. It allows for both multiple object transfers and fetching a single object when necessary. >Note: Using `keyPrefix` introduces an additional step to list all objects whose keys match the specified prefix. #### Example Usage: -#### Configuration with `keyPrefix`: +Configuration with `keyPrefix`: ```json { "dataAddress": { - "keyPrefix": "my-prefix/" + "keyName": "my-key-name", + "keyPrefix": "my-key-prefix/" } } ``` \ No newline at end of file diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java index 26af3d1a..a83b455c 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -43,7 +43,7 @@ class S3DataSink extends ParallelSink { private S3DataSink() {} @Override - protected StreamResult transferParts(List parts) { + protected StreamResult transferParts(List parts) { for (var part : parts) { try (var input = part.openStream()) { @@ -90,7 +90,7 @@ protected StreamResult transferParts(List parts) { } @Override - protected StreamResult complete() { + protected StreamResult complete() { var completeKeyName = keyName + ".complete"; var request = PutObjectRequest.builder().bucket(bucketName).key(completeKeyName).build(); try { @@ -103,7 +103,7 @@ protected StreamResult complete() { } @NotNull - private StreamResult uploadFailure(Exception e, String keyName) { + private StreamResult uploadFailure(Exception e, String keyName) { var message = format("Error writing the %s object on the %s bucket: %s", keyName, bucketName, e.getMessage()); monitor.severe(message, e); return StreamResult.error(message); diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java index 64a16554..05b172b6 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -16,19 +16,21 @@ import org.eclipse.edc.connector.dataplane.aws.s3.exception.S3ObjectNotFoundException; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure; import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; -import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.stream.Stream; +import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR; +import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.failure; import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.success; class S3DataSource implements DataSource { @@ -47,14 +49,20 @@ public StreamResult> openPartStream() { if (keyPrefix != null) { try { - var s3PartStream = this.fetchPrefixedS3Objects().stream() + var s3Objects = this.fetchPrefixedS3Objects(); + + if (s3Objects.isEmpty()) { + throw new S3ObjectNotFoundException("Object not found"); + } + + var s3PartStream = s3Objects.stream() .map(S3Object::key) .map(key -> (Part) new S3Part(client, key, bucketName)); return success(s3PartStream); - } catch (S3Exception e) { - throw new RuntimeException("Error listing objects in the bucket: " + e.getMessage()); + } catch (Exception e) { + return failure(new StreamFailure(List.of("Error listing objects in the bucket: " + e.getMessage()), GENERAL_ERROR)); } } @@ -88,8 +96,6 @@ private List fetchPrefixedS3Objects() { } while (continuationToken != null); - if (s3Objects.isEmpty()) throw new S3ObjectNotFoundException("Object not found"); - return s3Objects; } diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java index 4d57d0b7..5c466c82 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java index e752ff97..bc5fc005 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at diff --git a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java index 7db012fa..937ee36b 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java +++ b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -14,7 +14,6 @@ package org.eclipse.edc.connector.dataplane.aws.s3; -import org.eclipse.edc.connector.dataplane.aws.s3.exception.S3ObjectNotFoundException; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; @@ -22,7 +21,6 @@ import software.amazon.awssdk.services.s3.model.S3Object; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -50,14 +48,15 @@ void should_select_prefixed_objects_case_key_prefix_is_present() { when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse); - var content = s3Datasource.openPartStream().getContent(); + var result = s3Datasource.openPartStream(); + assertThat(result.succeeded()).isTrue(); verify(s3ClientMock, atLeastOnce()).listObjectsV2(any(ListObjectsV2Request.class)); - assertThat(content).hasSize(2); + assertThat(result.getContent()).hasSize(2); } @Test - void should_throw_exception_case_no_object_is_found() { + void should_fail_case_no_object_is_found() { var mockResponse = ListObjectsV2Response.builder().build(); @@ -70,9 +69,10 @@ void should_throw_exception_case_no_object_is_found() { when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse); - assertThatThrownBy(s3Datasource::openPartStream) - .isInstanceOf(S3ObjectNotFoundException.class) - .hasMessage("Object not found"); + var result = s3Datasource.openPartStream(); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().getFailureDetail()).isEqualTo("Error listing objects in the bucket: Object not found"); } @Test @@ -85,10 +85,11 @@ void should_select_single_object_case_key_prefix_is_not_present() { .client(s3ClientMock) .build(); - var content = s3Datasource.openPartStream().getContent(); + var result = s3Datasource.openPartStream(); + assertThat(result.succeeded()).isTrue(); verify(s3ClientMock, never()).listObjectsV2(any(ListObjectsV2Request.class)); - assertThat(content).hasSize(1); + assertThat(result.getContent()).hasSize(1); } } From 9c0b7def603a6e30eb3f19f0917d6d109c735ed7 Mon Sep 17 00:00:00 2001 From: yurisilvapjd Date: Mon, 9 Oct 2023 14:02:18 +0100 Subject: [PATCH 3/5] fix: imports --- .../edc/connector/dataplane/aws/s3/S3DataSource.java | 1 - .../edc/connector/dataplane/aws/s3/S3DataSourceFactory.java | 2 +- .../edc/connector/dataplane/aws/s3/S3DataSourceTest.java | 6 +++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java index 811c0af3..1aecfe0e 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java @@ -78,7 +78,6 @@ public void close() throws Exception { * Fetches S3 objects with the specified prefix. * * @return A list of S3 objects. - * @throws S3ObjectNotFoundException if no objects are found. */ private List fetchPrefixedS3Objects() { diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java index 5c466c82..12d80f93 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceFactory.java @@ -36,9 +36,9 @@ import static org.eclipse.edc.aws.s3.S3BucketSchema.ACCESS_KEY_ID; import static org.eclipse.edc.aws.s3.S3BucketSchema.BUCKET_NAME; import static org.eclipse.edc.aws.s3.S3BucketSchema.ENDPOINT_OVERRIDE; +import static org.eclipse.edc.aws.s3.S3BucketSchema.KEY_PREFIX; import static org.eclipse.edc.aws.s3.S3BucketSchema.REGION; import static org.eclipse.edc.aws.s3.S3BucketSchema.SECRET_ACCESS_KEY; -import static org.eclipse.edc.aws.s3.S3BucketSchema.KEY_PREFIX; public class S3DataSourceFactory implements DataSourceFactory { diff --git a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java index 937ee36b..14736467 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java +++ b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java @@ -22,7 +22,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class S3DataSourceTest { From ed65528a5041f73ad8ae839f3fa25bfbaa028dcb Mon Sep 17 00:00:00 2001 From: yurisilvapjd Date: Mon, 9 Oct 2023 16:34:17 +0100 Subject: [PATCH 4/5] fix: kept license header history and exception handling --- .../eclipse/edc/aws/s3/S3BucketSchema.java | 2 +- .../dataplane/aws/s3/S3DataSink.java | 2 +- .../dataplane/aws/s3/S3DataSinkFactory.java | 2 +- .../dataplane/aws/s3/S3DataSource.java | 33 ++++++++----------- .../exception/S3ObjectNotFoundException.java | 22 ------------- .../dataplane/aws/s3/S3DataSourceTest.java | 2 +- 6 files changed, 18 insertions(+), 45 deletions(-) delete mode 100644 extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java diff --git a/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java b/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java index 38370dcf..79c74858 100644 --- a/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java +++ b/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Microsoft Corporation + * Copyright (c) 2020-2023 Microsoft Corporation * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java index a83b455c..94fd496d 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java index a3d4d7ec..c5049f27 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java index 1aecfe0e..636d24f3 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -14,7 +14,6 @@ package org.eclipse.edc.connector.dataplane.aws.s3; -import org.eclipse.edc.connector.dataplane.aws.s3.exception.S3ObjectNotFoundException; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure; import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; @@ -47,33 +46,24 @@ private S3DataSource() { public StreamResult> openPartStream() { if (keyPrefix != null) { - try { - var s3Objects = this.fetchPrefixedS3Objects(); + var s3Objects = this.fetchPrefixedS3Objects(); - if (s3Objects.isEmpty()) { - throw new S3ObjectNotFoundException("Object not found"); - } + if (s3Objects.isEmpty()) { + return failure(new StreamFailure(List.of("Error listing S3 objects in the bucket: Object not found"), GENERAL_ERROR)); + } - var s3PartStream = s3Objects.stream() - .map(S3Object::key) - .map(key -> (Part) new S3Part(client, key, bucketName)); + var s3PartStream = s3Objects.stream() + .map(S3Object::key) + .map(key -> (Part) new S3Part(client, key, bucketName)); - return success(s3PartStream); + return success(s3PartStream); - } catch (Exception e) { - return failure(new StreamFailure(List.of("Error listing objects in the bucket: " + e.getMessage()), GENERAL_ERROR)); - } } return success(Stream.of(new S3Part(client, keyName, bucketName))); } - @Override - public void close() throws Exception { - client.close(); - } - /** * Fetches S3 objects with the specified prefix. * @@ -103,6 +93,11 @@ private List fetchPrefixedS3Objects() { return s3Objects; } + @Override + public void close() throws Exception { + client.close(); + } + private static class S3Part implements Part { private final S3Client client; private final String keyName; diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java deleted file mode 100644 index bc5fc005..00000000 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/exception/S3ObjectNotFoundException.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - */ - -package org.eclipse.edc.connector.dataplane.aws.s3.exception; - -public class S3ObjectNotFoundException extends RuntimeException { - - public S3ObjectNotFoundException(String message) { - super(message); - } -} diff --git a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java index 14736467..b51ae9d9 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java +++ b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSourceTest.java @@ -76,7 +76,7 @@ void should_fail_case_no_object_is_found() { var result = s3Datasource.openPartStream(); assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().getFailureDetail()).isEqualTo("Error listing objects in the bucket: Object not found"); + assertThat(result.getFailure().getFailureDetail()).isEqualTo("Error listing S3 objects in the bucket: Object not found"); } @Test From c3302fc618a09d0cf17946a426c3cb0f8b932c91 Mon Sep 17 00:00:00 2001 From: yurisilvapjd Date: Tue, 10 Oct 2023 09:51:04 +0100 Subject: [PATCH 5/5] fix: license header history format --- .../src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java | 2 +- .../org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java | 2 +- .../edc/connector/dataplane/aws/s3/S3DataSinkFactory.java | 2 +- .../eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java b/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java index 79c74858..ce919d23 100644 --- a/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java +++ b/extensions/common/aws/aws-s3-core/src/main/java/org/eclipse/edc/aws/s3/S3BucketSchema.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023 Microsoft Corporation + * Copyright (c) 2020 - 2023 Microsoft Corporation * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java index 94fd496d..36a41415 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2022 - 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java index c5049f27..febc0b85 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2022 - 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java index 636d24f3..3547ade1 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2022 - 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at