Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple Files Transfer Feature #61

Merged
merged 3 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import com.ionos.edc.extension.s3.connector.ionosapi.TemporaryKey;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.List;

@ExtensionPoint
public interface S3ConnectorApi {
Expand All @@ -27,10 +28,12 @@ public interface S3ConnectorApi {

boolean bucketExists(String bucketName);

void uploadParts(String bucketName, String fileName, ByteArrayInputStream part);

byte[] getFile(String bucketName, String fileName);

void uploadObject(String bucketName, String objectName, InputStream stream);

InputStream getObject(String bucketName, String objectName);

List<S3Object> listObjects(String bucketName, String objectName);

TemporaryKey createTemporaryKey();

void deleteTemporaryKey(String accessKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
import com.ionos.edc.extension.s3.connector.ionosapi.HttpConnector;
import com.ionos.edc.extension.s3.connector.ionosapi.TemporaryKey;

import io.minio.BucketExistsArgs;
import io.minio.GetObjectArgs;
import io.minio.MakeBucketArgs;
import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import io.minio.*;
import org.eclipse.edc.spi.EdcException;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class S3ConnectorApiImpl implements S3ConnectorApi {

Expand All @@ -35,62 +33,103 @@ public class S3ConnectorApiImpl implements S3ConnectorApi {

private MinioClient minioClient;
private final String region;
private final String token;
private String token;
private final Integer maxFiles;


public S3ConnectorApiImpl(String endpoint, String accessKey, String secretKey, String token) {
if (accessKey != null && secretKey != null && endpoint != null)
public S3ConnectorApiImpl(String endpoint, String accessKey, String secretKey, int maxFiles) {
if(accessKey != null && secretKey != null && endpoint != null)
this.minioClient = minConnector.connect(endpoint, accessKey, secretKey);
this.region = getRegion(endpoint);
this.token = "";
this.maxFiles = maxFiles;
}
public S3ConnectorApiImpl(String endpoint, String accessKey, String secretKey, String token, int maxFiles) {
this(endpoint, accessKey, secretKey, maxFiles);
this.token = token;
}


@Override
public void createBucket(String bucketName) {
if (!bucketExists(bucketName.toLowerCase())) {
try {
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName.toLowerCase()).region(region).build());
minioClient.makeBucket(MakeBucketArgs.builder()
.bucket(bucketName.toLowerCase())
.region(region)
.build());
} catch (Exception e) {
throw new EdcException("Creating bucket: " + e.getMessage());
}
}
}

@Override
public void uploadParts(String bucketName, String fileName, ByteArrayInputStream part) {
public boolean bucketExists(String bucketName) {
try {
return minioClient.bucketExists(BucketExistsArgs.builder()
.bucket(bucketName.toLowerCase())
.region(this.region)
.build());
} catch (Exception e) {
throw new EdcException("Verifying if bucket exists - " + e.getMessage());
}
}

@Override
public void uploadObject(String bucketName, String objectName, InputStream stream) {
if (!bucketExists(bucketName.toLowerCase())) {
createBucket(bucketName.toLowerCase());
}

try {
minioClient.putObject(PutObjectArgs.builder().bucket(bucketName.toLowerCase()).region(region).object(fileName).stream(part, part.available(), -1).build());
minioClient.putObject(PutObjectArgs.builder()
.bucket(bucketName.toLowerCase())
.region(region)
.object(objectName)
.stream(stream, stream.available(), -1)
.build());
} catch (Exception e) {
throw new EdcException("Uploading parts: " + e.getMessage());
}
}

@Override
public byte[] getFile(String bucketName, String fileName) {
public InputStream getObject(String bucketName, String objectName) {
if (!bucketExists(bucketName.toLowerCase())) {
throw new EdcException("Bucket not found - " + bucketName);
}

InputStream stream;
try {
stream = minioClient.getObject(GetObjectArgs.builder().bucket(bucketName.toLowerCase()).region(region).object(fileName).build());
return stream.readAllBytes();
return minioClient.getObject(GetObjectArgs.builder()
.bucket(bucketName.toLowerCase())
.region(region)
.object(objectName)
.build());
} catch (Exception e) {
throw new EdcException("Getting file - " + e.getMessage());
}
}

@Override
public boolean bucketExists(String bucketName) {
try {
return minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName.toLowerCase()).region(this.region).build());
} catch (Exception e) {
throw new EdcException("Verifying if bucket exists - " + e.getMessage());
}
public List<S3Object> listObjects(String bucketName, String objectName) {

var objects = minioClient.listObjects(ListObjectsArgs.builder()
.bucket(bucketName.toLowerCase())
.prefix(objectName)
.recursive(true)
.maxKeys(maxFiles)
.build());

return StreamSupport.stream(objects.spliterator(), false)
.map(item -> {
try {
return item.get();
} catch (Exception e) {
throw new EdcException("Error fetching object", e);
}
})
.map(item -> new S3Object(item.objectName(), item.size()))
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2023 IONOS
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* IONOS
*
*/

package com.ionos.edc.extension.s3.api;

public record S3Object(String objectName, long size) {

public boolean isRootObject(String blobName) {
return (objectName.equals(blobName) || objectName.equals(blobName + "/"));
}

public String shortObjectName(String blobName) {
if (isRootObject(blobName))
return objectName;

var shortObjectName = objectName.replaceFirst(blobName, "");

if (shortObjectName.indexOf("/") == 0)
return shortObjectName.replaceFirst("/", "");
else
return shortObjectName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;

import static com.ionos.edc.extension.s3.schema.IonosSettingsSchema.IONOS_ACCESS_KEY;
import static com.ionos.edc.extension.s3.schema.IonosSettingsSchema.IONOS_SECRET_KEY;
import static com.ionos.edc.extension.s3.schema.IonosSettingsSchema.IONOS_ENDPOINT;
import static com.ionos.edc.extension.s3.schema.IonosSettingsSchema.IONOS_TOKEN;
import static com.ionos.edc.extension.s3.schema.IonosSettingsSchema.IONOS_MAX_FILES_DEFAULT;

@Provides(S3ConnectorApi.class)
@Extension(value = S3CoreExtension.NAME)
public class S3CoreExtension implements ServiceExtension {

public static final String NAME = "IonosS3";
private static final String IONOS_ACCESS_KEY = "edc.ionos.access.key";
private static final String IONOS_SECRET_KEY = "edc.ionos.secret.key";
private static final String IONOS_ENDPOINT = "edc.ionos.endpoint";
private static final String IONOS_TOKEN = "edc.ionos.token";

@Inject
private Vault vault;
Expand Down Expand Up @@ -61,7 +63,7 @@ public void initialize(ServiceExtensionContext context) {
token = context.getSetting(IONOS_TOKEN, IONOS_TOKEN);
}

var s3Api = new S3ConnectorApiImpl(endPoint, accessKey, secretKey, token);
var s3Api = new S3ConnectorApiImpl(endPoint, accessKey, secretKey, token, IONOS_MAX_FILES_DEFAULT);
context.registerService(S3ConnectorApi.class, s3Api);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public interface IonosBucketSchema {
String STORAGE_NAME = EDC_NAMESPACE + "storage";
String BUCKET_NAME = EDC_NAMESPACE + "bucketName";
String BLOB_NAME = EDC_NAMESPACE + "blobName";
String FILTER_INCLUDES = EDC_NAMESPACE + "filter.includes";
String FILTER_EXCLUDES = EDC_NAMESPACE + "filter.excludes";
String ACCESS_KEY_ID = EDC_NAMESPACE + "accessKey";
String SECRET_ACCESS_KEY = EDC_NAMESPACE + "secretKey";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2022 IONOS
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* IONOS
*
*/

package com.ionos.edc.extension.s3.schema;

public interface IonosSettingsSchema {
String IONOS_ACCESS_KEY = "edc.ionos.access.key";
String IONOS_SECRET_KEY = "edc.ionos.secret.key";
String IONOS_ENDPOINT = "edc.ionos.endpoint";
String IONOS_TOKEN = "edc.ionos.token";
int IONOS_MAX_FILES_DEFAULT = 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class DataPlaneIonosS3Extension implements ServiceExtension {

@Inject
private S3ConnectorApi s3Api;

@Inject
private DataTransferExecutorServiceContainer executorContainer;

Expand All @@ -50,15 +50,12 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {


var monitor = context.getMonitor();

var sourceFactory = new IonosDataSourceFactory(s3Api,typeManager);
var sourceFactory = new IonosDataSourceFactory(s3Api, monitor);
pipelineService.registerFactory(sourceFactory);

var sinkFactory = new IonosDataSinkFactory(s3Api, executorContainer.getExecutorService(), monitor, vault,
typeManager);
var sinkFactory = new IonosDataSinkFactory(s3Api, executorContainer.getExecutorService(), monitor, vault, typeManager);
pipelineService.registerFactory(sinkFactory);
context.getMonitor().info("File Transfer Extension initialized!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.eclipse.edc.spi.EdcException;
import org.jetbrains.annotations.NotNull;

import java.io.ByteArrayInputStream;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -50,7 +49,7 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
if(input == null) {
throw new EdcException("Error transferring file");
}
s3Api.uploadParts(bucketName, blobName, new ByteArrayInputStream(input.readAllBytes()));
s3Api.uploadObject(bucketName, blobName, input);
} catch (Exception e) {
return uploadFailure(e, blobName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package com.ionos.edc.dataplane.ionos.s3;


import com.ionos.edc.dataplane.ionos.s3.validation.IonosSinkDataAddressValidationRule;
import com.ionos.edc.extension.s3.api.S3ConnectorApi;
import com.ionos.edc.extension.s3.api.S3ConnectorApiImpl;
Expand All @@ -34,6 +33,9 @@
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.ExecutorService;

import static com.ionos.edc.extension.s3.schema.IonosSettingsSchema.IONOS_MAX_FILES_DEFAULT;

public class IonosDataSinkFactory implements DataSinkFactory {

private static final String DEFAULT_STORAGE = "s3-eu-central-1.ionoscloud.com";
Expand Down Expand Up @@ -81,7 +83,11 @@ public DataSink createSink(DataFlowRequest request) {
var token = typeManager.readValue(secret, IonosToken.class);

if (destination.getStringProperty(IonosBucketSchema.STORAGE_NAME) != null) {
var s3ApiTemp = new S3ConnectorApiImpl(destination.getStringProperty(IonosBucketSchema.STORAGE_NAME), token.getAccessKey(), token.getSecretKey(), "");
var s3ApiTemp = new S3ConnectorApiImpl(destination.getStringProperty(IonosBucketSchema.STORAGE_NAME),
token.getAccessKey(),
token.getSecretKey(),
"",
IONOS_MAX_FILES_DEFAULT);
return IonosDataSink.Builder.newInstance()
.bucketName(destination.getStringProperty(IonosBucketSchema.BUCKET_NAME))
.blobName(destination.getStringProperty(IonosBucketSchema.BLOB_NAME))
Expand All @@ -90,7 +96,11 @@ public DataSink createSink(DataFlowRequest request) {
.monitor(monitor).s3Api(s3ApiTemp)
.build();
} else {
var s3ApiTemp = new S3ConnectorApiImpl(DEFAULT_STORAGE, token.getAccessKey(), token.getSecretKey(), "");
var s3ApiTemp = new S3ConnectorApiImpl(DEFAULT_STORAGE,
token.getAccessKey(),
token.getSecretKey(),
"",
IONOS_MAX_FILES_DEFAULT);
return IonosDataSink.Builder.newInstance()
.bucketName(destination.getStringProperty(IonosBucketSchema.BUCKET_NAME))
.blobName(destination.getStringProperty(IonosBucketSchema.BLOB_NAME))
Expand Down
Loading