Skip to content

Commit

Permalink
feat: adds transfer multiple S3 objects functionality (#104)
Browse files Browse the repository at this point in the history
* feat: adds transfer multiple S3 objects functionality

* fix: docs, exception handling and headers

* fix: imports

* fix: kept license header history and exception handling

* fix: license header history format
  • Loading branch information
yurimssilva authored Oct 10, 2023
1 parent d030ae7 commit cd7e6bd
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Microsoft Corporation
* Copyright (c) 2020 - 2023 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand All @@ -9,6 +9,7 @@
*
* Contributors:
* Microsoft Corporation - initial API and implementation
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
*/

Expand All @@ -18,6 +19,7 @@ public interface S3BucketSchema {
String TYPE = "AmazonS3";
String REGION = "region";
String BUCKET_NAME = "bucketName";
String KEY_PREFIX = "keyPrefix";
String ACCESS_KEY_ID = "accessKeyId";
String SECRET_ACCESS_KEY = "secretAccessKey";
String ENDPOINT_OVERRIDE = "endpointOverride";
Expand Down
25 changes: 24 additions & 1 deletion extensions/data-plane/data-plane-aws-s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,27 @@

This module contains a Data Plane extension to copy data to and from Aws S3.

When used as a source, it currently only supports copying a single object.
When as a source, it supports copying a single or multiple objects.

#### AmazonS3 DataAddress Configuration

The behavior of object transfers can be customized using `DataAddress` properties.

- When `keyPrefix` is present, transfer all objects with keys that start with the specified prefix.
- When `keyPrefix` is not present, transfer only the object with a key matching the `keyName` property.
- Precedence: `keyPrefix` takes precedence over `keyName` when determining which objects to transfer. It allows for both multiple object transfers and fetching a single object when necessary.

>Note: Using `keyPrefix` introduces an additional step to list all objects whose keys match the specified prefix.
#### Example Usage:

Configuration with `keyPrefix`:

```json
{
"dataAddress": {
"keyName": "my-key-name",
"keyPrefix": "my-key-prefix/"
}
}
```
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* Copyright (c) 2022 - 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand Down Expand Up @@ -52,7 +52,7 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {

var uploadId = client.createMultipartUpload(CreateMultipartUploadRequest.builder()
.bucket(bucketName)
.key(keyName)
.key(part.name())
.build()).uploadId();

while (true) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* Copyright (c) 2022 - 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* Copyright (c) 2022 - 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand All @@ -15,30 +15,84 @@
package org.eclipse.edc.connector.dataplane.aws.s3;

import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.S3Object;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.failure;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.success;

class S3DataSource implements DataSource {

private String bucketName;
private String keyName;
private String keyPrefix;
private S3Client client;

private S3DataSource() {
}

@Override
public StreamResult<Stream<Part>> openPartStream() {

if (keyPrefix != null) {

var s3Objects = this.fetchPrefixedS3Objects();

if (s3Objects.isEmpty()) {
return failure(new StreamFailure(List.of("Error listing S3 objects in the bucket: Object not found"), GENERAL_ERROR));
}

var s3PartStream = s3Objects.stream()
.map(S3Object::key)
.map(key -> (Part) new S3Part(client, key, bucketName));

return success(s3PartStream);

}

return success(Stream.of(new S3Part(client, keyName, bucketName)));
}

/**
* Fetches S3 objects with the specified prefix.
*
* @return A list of S3 objects.
*/
private List<S3Object> fetchPrefixedS3Objects() {

String continuationToken = null;
List<S3Object> s3Objects = new ArrayList<>();

do {

var listObjectsRequest = ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(keyPrefix)
.continuationToken(continuationToken)
.build();

var response = client.listObjectsV2(listObjectsRequest);

s3Objects.addAll(response.contents());

continuationToken = response.nextContinuationToken();

} while (continuationToken != null);

return s3Objects;
}

@Override
public void close() throws Exception {
client.close();
Expand Down Expand Up @@ -94,6 +148,11 @@ public Builder keyName(String keyName) {
return this;
}

public Builder keyPrefix(String keyPrefix) {
source.keyPrefix = keyPrefix;
return this;
}

public Builder client(S3Client client) {
source.client = client;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand Down Expand Up @@ -36,6 +36,7 @@
import static org.eclipse.edc.aws.s3.S3BucketSchema.ACCESS_KEY_ID;
import static org.eclipse.edc.aws.s3.S3BucketSchema.BUCKET_NAME;
import static org.eclipse.edc.aws.s3.S3BucketSchema.ENDPOINT_OVERRIDE;
import static org.eclipse.edc.aws.s3.S3BucketSchema.KEY_PREFIX;
import static org.eclipse.edc.aws.s3.S3BucketSchema.REGION;
import static org.eclipse.edc.aws.s3.S3BucketSchema.SECRET_ACCESS_KEY;

Expand Down Expand Up @@ -75,10 +76,11 @@ public DataSource createSource(DataFlowRequest request) {
var source = request.getSourceDataAddress();

return S3DataSource.Builder.newInstance()
.bucketName(source.getStringProperty(BUCKET_NAME))
.keyName(source.getKeyName())
.client(getS3Client(source))
.build();
.bucketName(source.getStringProperty(BUCKET_NAME))
.keyName(source.getKeyName())
.keyPrefix(source.getStringProperty(KEY_PREFIX))
.client(getS3Client(source))
.build();
}

private S3Client getS3Client(DataAddress address) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
*/

package org.eclipse.edc.connector.dataplane.aws.s3;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class S3DataSourceTest {

private static final String BUCKET_NAME = "bucketName";
private static final String KEY_NAME = "object-1";
private static final String KEY_PREFIX = "my-prefix/";
private final S3Client s3ClientMock = mock(S3Client.class);

@Test
void should_select_prefixed_objects_case_key_prefix_is_present() {

var mockResponse = ListObjectsV2Response.builder().contents(
S3Object.builder().key("my-prefix/object-1").build(),
S3Object.builder().key("my-prefix/object-2").build()
).build();

var s3Datasource = S3DataSource.Builder.newInstance()
.bucketName(BUCKET_NAME)
.keyName(KEY_NAME)
.keyPrefix(KEY_PREFIX)
.client(s3ClientMock)
.build();

when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse);

var result = s3Datasource.openPartStream();

assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, atLeastOnce()).listObjectsV2(any(ListObjectsV2Request.class));
assertThat(result.getContent()).hasSize(2);
}

@Test
void should_fail_case_no_object_is_found() {

var mockResponse = ListObjectsV2Response.builder().build();

var s3Datasource = S3DataSource.Builder.newInstance()
.bucketName(BUCKET_NAME)
.keyName(KEY_NAME)
.keyPrefix(KEY_PREFIX)
.client(s3ClientMock)
.build();

when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse);

var result = s3Datasource.openPartStream();

assertThat(result.failed()).isTrue();
assertThat(result.getFailure().getFailureDetail()).isEqualTo("Error listing S3 objects in the bucket: Object not found");
}

@Test
void should_select_single_object_case_key_prefix_is_not_present() {

var s3Datasource = S3DataSource.Builder.newInstance()
.bucketName(BUCKET_NAME)
.keyName(KEY_NAME)
.keyPrefix(null)
.client(s3ClientMock)
.build();

var result = s3Datasource.openPartStream();

assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, never()).listObjectsV2(any(ListObjectsV2Request.class));
assertThat(result.getContent()).hasSize(1);
}

}

0 comments on commit cd7e6bd

Please sign in to comment.