Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 committed Jul 14, 2023
1 parent 4b7197f commit 0406c54
Show file tree
Hide file tree
Showing 8 changed files with 561 additions and 226 deletions.
5 changes: 5 additions & 0 deletions lib/trino-filesystem-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-crt-client</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.InputSerialization;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.OutputSerialization;
import software.amazon.awssdk.services.s3.model.RequestPayer;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
Expand All @@ -44,16 +47,18 @@
import static com.google.common.collect.Multimaps.toMultimap;
import static java.util.Objects.requireNonNull;

final class S3FileSystem
public final class S3FileSystem
implements TrinoFileSystem
{
private final S3Client client;
private final S3AsyncClient asyncClient;
private final S3Context context;
private final RequestPayer requestPayer;

public S3FileSystem(S3Client client, S3Context context)
public S3FileSystem(S3Client client, S3AsyncClient asyncClient, S3Context context)
{
this.client = requireNonNull(client, "client is null");
this.asyncClient = requireNonNull(asyncClient, "asyncClient is null");
this.context = requireNonNull(context, "context is null");
this.requestPayer = context.requestPayer();
}
Expand All @@ -70,6 +75,11 @@ public TrinoInputFile newInputFile(Location location, long length)
return new S3InputFile(client, context, new S3Location(location), length);
}

public TrinoInputFile newS3SelectInputFile(Location location, String query, boolean enableScanRange, InputSerialization inputSerialization, OutputSerialization outputSerialization)
{
return new S3SelectInputFile(client, asyncClient, context, new S3Location(location), query, enableScanRange, inputSerialization, outputSerialization);
}

@Override
public TrinoOutputFile newOutputFile(Location location)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@
import jakarta.annotation.PreDestroy;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.sts.StsClient;
Expand All @@ -38,13 +44,33 @@ public final class S3FileSystemFactory
implements TrinoFileSystemFactory
{
private final S3Client client;
private final S3AsyncClient asyncClient;
private final S3Context context;

@Inject
public S3FileSystemFactory(S3FileSystemConfig config)
{
S3ClientBuilder s3 = S3Client.builder();
applyS3Properties(s3, config);
s3.httpClient(buildHttpClient(config));

S3AsyncClientBuilder asyncS3 = S3AsyncClient.builder();
applyS3Properties(asyncS3, config);
asyncS3.httpClient(buildAsyncHttpClient(config));

this.client = s3.build();
this.asyncClient = asyncS3.build();

context = new S3Context(
toIntExact(config.getStreamingPartSize().toBytes()),
config.isRequesterPays(),
config.getSseType(),
config.getSseKmsKeyId());

}

private static void applyS3Properties(S3BaseClientBuilder<?, ?> s3, S3FileSystemConfig config)
{
if ((config.getAwsAccessKey() != null) && (config.getAwsSecretKey() != null)) {
s3.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(config.getAwsAccessKey(), config.getAwsSecretKey())));
Expand All @@ -70,7 +96,10 @@ public S3FileSystemFactory(S3FileSystemConfig config)
.asyncCredentialUpdateEnabled(true)
.build());
}
}

private static SdkHttpClient buildHttpClient(S3FileSystemConfig config)
{
ApacheHttpClient.Builder httpClient = ApacheHttpClient.builder()
.maxConnections(config.getMaxConnections());

Expand All @@ -83,26 +112,34 @@ public S3FileSystemFactory(S3FileSystemConfig config)
.build());
}

s3.httpClientBuilder(httpClient);
return httpClient.build();
}

this.client = s3.build();
private static SdkAsyncHttpClient buildAsyncHttpClient(S3FileSystemConfig config)
{
AwsCrtAsyncHttpClient.Builder httpClient = AwsCrtAsyncHttpClient.builder();
if (config.getHttpProxy() != null) {
String scheme = config.isHttpProxySecure() ? "https" : "http";
httpClient.proxyConfiguration(software.amazon.awssdk.http.crt.ProxyConfiguration.builder()
.scheme(scheme)
.host(config.getHttpProxy().getHost())
.port(config.getHttpProxy().getPort())
.build());
}

context = new S3Context(
toIntExact(config.getStreamingPartSize().toBytes()),
config.isRequesterPays(),
config.getSseType(),
config.getSseKmsKeyId());
return httpClient.build();
}

@PreDestroy
public void destroy()
{
client.close();
asyncClient.close();
}

@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
return new S3FileSystem(client, context);
return new S3FileSystem(client, asyncClient, context);
}
}
Loading

0 comments on commit 0406c54

Please sign in to comment.