Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds transfer multiple S3 objects functionality #104

Merged
merged 6 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Microsoft Corporation
* Copyright (c) 2020-2023 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand All @@ -9,6 +9,7 @@
*
* Contributors:
* Microsoft Corporation - initial API and implementation
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
*/

Expand All @@ -18,6 +19,7 @@ public interface S3BucketSchema {
String TYPE = "AmazonS3";
String REGION = "region";
String BUCKET_NAME = "bucketName";
String KEY_PREFIX = "keyPrefix";
String ACCESS_KEY_ID = "accessKeyId";
String SECRET_ACCESS_KEY = "secretAccessKey";
String ENDPOINT_OVERRIDE = "endpointOverride";
Expand Down
25 changes: 24 additions & 1 deletion extensions/data-plane/data-plane-aws-s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,27 @@

This module contains a Data Plane extension to copy data to and from Aws S3.

When used as a source, it currently only supports copying a single object.
When as a source, it supports copying a single or multiple objects.

#### AmazonS3 DataAddress Configuration

The behavior of object transfers can be customized using `DataAddress` properties.

- When `keyPrefix` is present, transfer all objects with keys that start with the specified prefix.
- When `keyPrefix` is not present, transfer only the object with a key matching the `keyName` property.
- Precedence: `keyPrefix` takes precedence over `keyName` when determining which objects to transfer. It allows for both multiple object transfers and fetching a single object when necessary.

>Note: Using `keyPrefix` introduces an additional step to list all objects whose keys match the specified prefix.

#### Example Usage:

Configuration with `keyPrefix`:

```json
{
"dataAddress": {
"keyName": "my-key-name",
"keyPrefix": "my-key-prefix/"
}
}
```
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* Copyright (c) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand Down Expand Up @@ -52,7 +52,7 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {

var uploadId = client.createMultipartUpload(CreateMultipartUploadRequest.builder()
.bucket(bucketName)
.key(keyName)
.key(part.name())
.build()).uploadId();

while (true) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* Copyright (c) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* Copyright (c) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand All @@ -15,30 +15,84 @@
package org.eclipse.edc.connector.dataplane.aws.s3;

import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.S3Object;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.failure;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.success;

class S3DataSource implements DataSource {

private String bucketName;
private String keyName;
private String keyPrefix;
private S3Client client;

private S3DataSource() {
}

@Override
public StreamResult<Stream<Part>> openPartStream() {

if (keyPrefix != null) {

var s3Objects = this.fetchPrefixedS3Objects();

if (s3Objects.isEmpty()) {
return failure(new StreamFailure(List.of("Error listing S3 objects in the bucket: Object not found"), GENERAL_ERROR));
}

var s3PartStream = s3Objects.stream()
.map(S3Object::key)
.map(key -> (Part) new S3Part(client, key, bucketName));

return success(s3PartStream);

}

return success(Stream.of(new S3Part(client, keyName, bucketName)));
}

/**
* Fetches S3 objects with the specified prefix.
*
* @return A list of S3 objects.
*/
private List<S3Object> fetchPrefixedS3Objects() {

String continuationToken = null;
List<S3Object> s3Objects = new ArrayList<>();

do {

var listObjectsRequest = ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(keyPrefix)
.continuationToken(continuationToken)
.build();

var response = client.listObjectsV2(listObjectsRequest);

s3Objects.addAll(response.contents());

continuationToken = response.nextContinuationToken();

} while (continuationToken != null);

return s3Objects;
}

@Override
public void close() throws Exception {
client.close();
Expand Down Expand Up @@ -94,6 +148,11 @@ public Builder keyName(String keyName) {
return this;
}

public Builder keyPrefix(String keyPrefix) {
source.keyPrefix = keyPrefix;
return this;
}

public Builder client(S3Client client) {
source.client = client;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand Down Expand Up @@ -36,6 +36,7 @@
import static org.eclipse.edc.aws.s3.S3BucketSchema.ACCESS_KEY_ID;
import static org.eclipse.edc.aws.s3.S3BucketSchema.BUCKET_NAME;
import static org.eclipse.edc.aws.s3.S3BucketSchema.ENDPOINT_OVERRIDE;
import static org.eclipse.edc.aws.s3.S3BucketSchema.KEY_PREFIX;
import static org.eclipse.edc.aws.s3.S3BucketSchema.REGION;
import static org.eclipse.edc.aws.s3.S3BucketSchema.SECRET_ACCESS_KEY;

Expand Down Expand Up @@ -75,10 +76,11 @@ public DataSource createSource(DataFlowRequest request) {
var source = request.getSourceDataAddress();

return S3DataSource.Builder.newInstance()
.bucketName(source.getStringProperty(BUCKET_NAME))
.keyName(source.getKeyName())
.client(getS3Client(source))
.build();
.bucketName(source.getStringProperty(BUCKET_NAME))
.keyName(source.getKeyName())
.keyPrefix(source.getStringProperty(KEY_PREFIX))
.client(getS3Client(source))
.build();
}

private S3Client getS3Client(DataAddress address) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
*/

package org.eclipse.edc.connector.dataplane.aws.s3;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class S3DataSourceTest {

private static final String BUCKET_NAME = "bucketName";
private static final String KEY_NAME = "object-1";
private static final String KEY_PREFIX = "my-prefix/";
private final S3Client s3ClientMock = mock(S3Client.class);

@Test
void should_select_prefixed_objects_case_key_prefix_is_present() {

var mockResponse = ListObjectsV2Response.builder().contents(
S3Object.builder().key("my-prefix/object-1").build(),
S3Object.builder().key("my-prefix/object-2").build()
).build();

var s3Datasource = S3DataSource.Builder.newInstance()
.bucketName(BUCKET_NAME)
.keyName(KEY_NAME)
.keyPrefix(KEY_PREFIX)
.client(s3ClientMock)
.build();

when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse);

var result = s3Datasource.openPartStream();

assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, atLeastOnce()).listObjectsV2(any(ListObjectsV2Request.class));
assertThat(result.getContent()).hasSize(2);
}

@Test
void should_fail_case_no_object_is_found() {

var mockResponse = ListObjectsV2Response.builder().build();

var s3Datasource = S3DataSource.Builder.newInstance()
.bucketName(BUCKET_NAME)
.keyName(KEY_NAME)
.keyPrefix(KEY_PREFIX)
.client(s3ClientMock)
.build();

when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse);

var result = s3Datasource.openPartStream();

assertThat(result.failed()).isTrue();
assertThat(result.getFailure().getFailureDetail()).isEqualTo("Error listing S3 objects in the bucket: Object not found");
}

@Test
void should_select_single_object_case_key_prefix_is_not_present() {

var s3Datasource = S3DataSource.Builder.newInstance()
.bucketName(BUCKET_NAME)
.keyName(KEY_NAME)
.keyPrefix(null)
.client(s3ClientMock)
.build();

var result = s3Datasource.openPartStream();

assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, never()).listObjectsV2(any(ListObjectsV2Request.class));
assertThat(result.getContent()).hasSize(1);
}

}
Loading