Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing big files transferring issues #69

Merged
merged 3 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.ionos.edc.extension.s3.connector.ionosapi.TemporaryKey;

import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.util.List;

@ExtensionPoint
Expand All @@ -28,9 +28,11 @@ public interface S3ConnectorApi {

boolean bucketExists(String bucketName);

void uploadObject(String bucketName, String objectName, InputStream stream);
void uploadObject(String bucketName, String objectName, ByteArrayInputStream stream);

InputStream getObject(String bucketName, String objectName);
ByteArrayInputStream getObject(String bucketName, String objectName);

ByteArrayInputStream getObject(String bucketName, String objectName, long offset, long length);

List<S3Object> listObjects(String bucketName, String objectName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@
import com.ionos.edc.extension.s3.connector.ionosapi.HttpConnector;
import com.ionos.edc.extension.s3.connector.ionosapi.TemporaryKey;

import io.minio.*;
import io.minio.BucketExistsArgs;
import io.minio.GetObjectArgs;
import io.minio.ListObjectsArgs;
import io.minio.MakeBucketArgs;
import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import org.eclipse.edc.spi.EdcException;

import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class S3ConnectorApiImpl implements S3ConnectorApi {

MinioConnector minConnector = new MinioConnector();
MinioConnector miniConnector = new MinioConnector();
HttpConnector ionosApi = new HttpConnector();

private MinioClient minioClient;
Expand All @@ -38,11 +43,12 @@ public class S3ConnectorApiImpl implements S3ConnectorApi {

public S3ConnectorApiImpl(String endpoint, String accessKey, String secretKey, int maxFiles) {
if(accessKey != null && secretKey != null && endpoint != null)
this.minioClient = minConnector.connect(endpoint, accessKey, secretKey);
this.minioClient = miniConnector.connect(endpoint, accessKey, secretKey);
this.region = getRegion(endpoint);
this.token = "";
this.maxFiles = maxFiles;
}

public S3ConnectorApiImpl(String endpoint, String accessKey, String secretKey, String token, int maxFiles) {
this(endpoint, accessKey, secretKey, maxFiles);
this.token = token;
Expand Down Expand Up @@ -76,7 +82,7 @@ public boolean bucketExists(String bucketName) {
}

@Override
public void uploadObject(String bucketName, String objectName, InputStream stream) {
public void uploadObject(String bucketName, String objectName, ByteArrayInputStream stream) {
if (!bucketExists(bucketName.toLowerCase())) {
createBucket(bucketName.toLowerCase());
}
Expand All @@ -94,17 +100,40 @@ public void uploadObject(String bucketName, String objectName, InputStream strea
}

@Override
public InputStream getObject(String bucketName, String objectName) {
public ByteArrayInputStream getObject(String bucketName, String objectName) {
if (!bucketExists(bucketName.toLowerCase())) {
throw new EdcException("Bucket not found - " + bucketName);
}

try {
return minioClient.getObject(GetObjectArgs.builder()
.bucket(bucketName.toLowerCase())
.region(region)
.object(objectName)
.build());
var request = GetObjectArgs.builder()
.bucket(bucketName.toLowerCase())
.region(region)
.object(objectName)
.build();

try (var response = minioClient.getObject(request)) {
return new ByteArrayInputStream(response.readAllBytes());
} catch (Exception e) {
throw new EdcException("Getting file - " + e.getMessage());
}
}

@Override
public ByteArrayInputStream getObject(String bucketName, String objectName, long offset, long length) {
if (!bucketExists(bucketName.toLowerCase())) {
throw new EdcException("Bucket not found - " + bucketName);
}

var request = GetObjectArgs.builder()
.bucket(bucketName.toLowerCase())
.region(region)
.object(objectName)
.offset(offset)
.length(length)
.build();

try (var response = minioClient.getObject(request)) {
return new ByteArrayInputStream(response.readAllBytes());
} catch (Exception e) {
throw new EdcException("Getting file - " + e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

public record S3Object(String objectName, long size) {

public boolean isDirectory() {
return objectName.endsWith("/");
}

public boolean isRootObject(String blobName) {
return (objectName.equals(blobName) || objectName.equals(blobName + "/"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
import io.minio.MinioClient;

public class MinioConnector {


public MinioClient connect(String endpoint, String accessKey, String secretKey) {
MinioClient minioClient = MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).build();

return minioClient;
return MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,68 +4,61 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import okhttp3.FormBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.eclipse.edc.spi.EdcException;


public class HttpConnector {
OkHttpClient client = new OkHttpClient();
String basicUrl = "https://api.ionos.com/cloudapi/v6/um/users/";

public String retrieveUserID(String token) {
String[] jwtParts = token.split("\\.");
String jwtPayload = new String(java.util.Base64.getDecoder().decode(jwtParts[1]));
String uuid = jwtPayload.split("\"uuid\":\"")[1].split("\"")[0];
OkHttpClient client = new OkHttpClient();
String basicUrl = "https://api.ionos.com/cloudapi/v6/um/users/";

public String retrieveUserID(String token) {
String[] jwtParts = token.split("\\.");
String jwtPayload = new String(java.util.Base64.getDecoder().decode(jwtParts[1]));
String uuid = jwtPayload.split("\"uuid\":\"")[1].split("\"")[0];

return uuid;
}

public TemporaryKey createTemporaryKey(String token) {
String url = basicUrl + retrieveUserID(token) + "/s3keys";

Request request = new Request.Builder().url(url)
//This adds the token to the header.
.addHeader("Authorization", "Bearer " + token)
.post(RequestBody.create(null, new byte[0]))
.build();
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}

return uuid;
}


public TemporaryKey createTemporaryKey(String token) {
String url = basicUrl + retrieveUserID(token) + "/s3keys";


Request request = new Request.Builder()
.url(url)
//This adds the token to the header.
.addHeader("Authorization", "Bearer " + token)
.post(RequestBody.create(null, new byte[0]))
.build();
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
}
ObjectMapper objectMapper = new ObjectMapper();
S3Key resp = objectMapper.readValue(response.body().string(), S3Key.class);
TemporaryKey temp = new TemporaryKey(resp.getId().toString(), resp.getProperties().get("secretKey").toString());
return temp;
} catch (IOException e) {
throw new EdcException("Error getting S3 temporary key", e);
}
}

public void deleteTemporaryAccount(String token, String keyID) {
String url = basicUrl + retrieveUserID(token) + "/s3keys/" + keyID;

ObjectMapper objectMapper = new ObjectMapper();
S3Key resp = objectMapper.readValue(response.body().string(), S3Key.class);
TemporaryKey temp = new TemporaryKey(resp.getId().toString(),resp.getProperties().get("secretKey").toString());
return temp;
} catch (IOException e) {
throw new EdcException("Error getting S3 temporary key", e);
}
}

public void deleteTemporaryAccount(String token, String keyID) {
String url = basicUrl + retrieveUserID(token) + "/s3keys/" + keyID;
Request request = new Request.Builder().url(url)
//This adds the token to the header.
.addHeader("Authorization", "Bearer " + token)
.delete()
.build();

Request request = new Request.Builder()
.url(url)
//This adds the token to the header.
.addHeader("Authorization", "Bearer " + token)
.delete()
.build();
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
}
} catch (IOException e) {
e.printStackTrace();
}
}

try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response + " deleting S3 temporary key");
}
} catch (IOException e) {
throw new EdcException("Error deleting S3 temporary key", e);
}
}
}
5 changes: 5 additions & 0 deletions extensions/data-plane-ionos-s3/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation("org.realityforge.org.jetbrains.annotations:org.jetbrains.annotations:1.7.0")

testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.1")
testImplementation("org.junit.jupiter:junit-jupiter-engine:5.9.1")
testImplementation("org.assertj:assertj-core:3.22.0")
}

Expand All @@ -38,6 +39,10 @@ java {
withSourcesJar()
}

tasks.test {
useJUnitPlatform()
}

publishing {
publications {
create<MavenPublication>("maven") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;

import static com.ionos.edc.extension.s3.schema.IonosSettingsSchema.IONOS_MAX_FILES_DEFAULT;

@Extension(value = DataPlaneIonosS3Extension.NAME)
public class DataPlaneIonosS3Extension implements ServiceExtension {

Expand Down Expand Up @@ -55,7 +57,7 @@ public void initialize(ServiceExtensionContext context) {
var sourceFactory = new IonosDataSourceFactory(s3Api, monitor);
pipelineService.registerFactory(sourceFactory);

var sinkFactory = new IonosDataSinkFactory(s3Api, executorContainer.getExecutorService(), monitor, vault, typeManager);
var sinkFactory = new IonosDataSinkFactory(s3Api, executorContainer.getExecutorService(), monitor, vault, typeManager, IONOS_MAX_FILES_DEFAULT);
pipelineService.registerFactory(sinkFactory);
context.getMonitor().info("File Transfer Extension initialized!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.connector.dataplane.util.sink.ParallelSink;
import org.eclipse.edc.spi.EdcException;
import org.jetbrains.annotations.NotNull;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.List;
import java.util.Objects;

Expand All @@ -31,7 +32,7 @@ public class IonosDataSink extends ParallelSink {
private S3ConnectorApi s3Api;
private String bucketName;
private String blobName;

private IonosDataSink() {}

@Override
Expand All @@ -45,8 +46,25 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
blobName = part.name();
}

try (var input = part.openStream()) {
s3Api.uploadObject(bucketName, blobName, input);
var streamsOutput = new ByteArrayOutputStream();
var stream = part.openStream();
while (stream != null) {
try {
streamsOutput.write(stream.readAllBytes());
stream.close();

} catch (Exception e) {
return uploadFailure(e, blobName);
}

stream = part.openStream();
}

var byteArray = streamsOutput.toByteArray();
try (var streamsInput = new ByteArrayInputStream(byteArray)) {
s3Api.uploadObject(bucketName, blobName, streamsInput);
streamsOutput.close();

} catch (Exception e) {
return uploadFailure(e, blobName);
}
Expand Down
Loading