Skip to content

Commit

Permalink
HADOOP-18708: Support S3 Client Side Encryption(CSE) With AWS SDK V2
Browse files Browse the repository at this point in the history
  • Loading branch information
shameersss1 committed Oct 8, 2024
1 parent dc56fc3 commit 37eead5
Show file tree
Hide file tree
Showing 50 changed files with 2,209 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ CompletableFuture<UploadHandle> startUpload(Path filePath)
* It is possible to have parts uploaded in any order (or in parallel).
* @param uploadId Identifier from {@link #startUpload(Path)}.
* @param partNumber Index of the part relative to others.
* @param isLastPart is the part the last part of the upload?
* @param filePath Target path for upload (as {@link #startUpload(Path)}).
* @param inputStream Data for this part. Implementations MUST close this
* stream after reading in the data.
Expand All @@ -67,6 +68,7 @@ CompletableFuture<UploadHandle> startUpload(Path filePath)
CompletableFuture<PartHandle> putPart(
UploadHandle uploadId,
int partNumber,
boolean isLastPart,
Path filePath,
InputStream inputStream,
long lengthInBytes)
Expand All @@ -77,7 +79,7 @@ CompletableFuture<PartHandle> putPart(
* @param uploadId Identifier from {@link #startUpload(Path)}.
* @param filePath Target path for upload (as {@link #startUpload(Path)}.
* @param handles non-empty map of part number to part handle.
* from {@link #putPart(UploadHandle, int, Path, InputStream, long)}.
* from {@link #putPart(UploadHandle, int, boolean, Path, InputStream, long)}.
* @return unique PathHandle identifier for the uploaded file.
* @throws IOException IO failure
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {

/**
* Check all the arguments to the
* {@link MultipartUploader#putPart(UploadHandle, int, Path, InputStream, long)}
* {@link MultipartUploader#putPart(UploadHandle, int, boolean, Path, InputStream, long)}
* operation.
* @param filePath Target path for upload (as {@link #startUpload(Path)}).
* @param inputStream Data for this part. Implementations MUST close this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public CompletableFuture<UploadHandle> startUpload(Path filePath)

@Override
public CompletableFuture<PartHandle> putPart(UploadHandle uploadId,
int partNumber, Path filePath,
int partNumber, boolean isLastPart, Path filePath,
InputStream inputStream,
long lengthInBytes)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void testSingleUpload() throws Exception {
// was interpreted as an inconsistent write.
MultipartUploader completer = uploader0;
// and upload with uploader 1 to validate cross-uploader uploads
PartHandle partHandle = putPart(file, uploadHandle, 1, payload);
PartHandle partHandle = putPart(file, uploadHandle, 1, true, payload);
partHandles.put(1, partHandle);
PathHandle fd = complete(completer, uploadHandle, file,
partHandles);
Expand Down Expand Up @@ -317,12 +317,13 @@ protected PartHandle buildAndPutPart(
final Path file,
final UploadHandle uploadHandle,
final int index,
final boolean isLastPart,
final MessageDigest origDigest) throws IOException {
byte[] payload = generatePayload(index);
if (origDigest != null) {
origDigest.update(payload);
}
return putPart(file, uploadHandle, index, payload);
return putPart(file, uploadHandle, index, isLastPart, payload);
}

/**
Expand All @@ -331,13 +332,15 @@ protected PartHandle buildAndPutPart(
* @param file destination
* @param uploadHandle handle
* @param index index of part
* @param isLastPart is last part of the upload ?
* @param payload byte array of payload
* @return the part handle
* @throws IOException IO failure.
*/
protected PartHandle putPart(final Path file,
final UploadHandle uploadHandle,
final int index,
final boolean isLastPart,
final byte[] payload) throws IOException {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
PartHandle partHandle;
Expand All @@ -347,7 +350,7 @@ protected PartHandle putPart(final Path file,
payload.length,
file)) {
partHandle = awaitFuture(getUploader(index)
.putPart(uploadHandle, index, file,
.putPart(uploadHandle, index, isLastPart, file,
new ByteArrayInputStream(payload),
payload.length));
}
Expand Down Expand Up @@ -488,7 +491,7 @@ public void testMultipartUpload() throws Exception {
MessageDigest origDigest = DigestUtils.getMd5Digest();
int payloadCount = getTestPayloadCount();
for (int i = 1; i <= payloadCount; ++i) {
PartHandle partHandle = buildAndPutPart(file, uploadHandle, i,
PartHandle partHandle = buildAndPutPart(file, uploadHandle, i, i == payloadCount,
origDigest);
partHandles.put(i, partHandle);
}
Expand All @@ -515,7 +518,7 @@ public void testMultipartUploadEmptyPart() throws Exception {
origDigest.update(payload);
InputStream is = new ByteArrayInputStream(payload);
PartHandle partHandle = awaitFuture(
uploader.putPart(uploadHandle, 1, file, is, payload.length));
uploader.putPart(uploadHandle, 1, true, file, is, payload.length));
partHandles.put(1, partHandle);
completeUpload(file, uploadHandle, partHandles, origDigest, 0);
}
Expand All @@ -530,7 +533,7 @@ public void testUploadEmptyBlock() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0]));
partHandles.put(1, putPart(file, uploadHandle, 1, true, new byte[0]));
completeUpload(file, uploadHandle, partHandles, null, 0);
}

Expand All @@ -550,7 +553,8 @@ public void testMultipartUploadReverseOrder() throws Exception {
origDigest.update(payload);
}
for (int i = payloadCount; i > 0; --i) {
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount,
null));
}
completeUpload(file, uploadHandle, partHandles, origDigest,
payloadCount * partSizeInBytes());
Expand All @@ -574,7 +578,8 @@ public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
}
Map<Integer, PartHandle> partHandles = new HashMap<>();
for (int i = payloadCount; i > 0; i -= 2) {
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount,
null));
}
completeUpload(file, uploadHandle, partHandles, origDigest,
getTestPayloadCount() * partSizeInBytes());
Expand All @@ -591,7 +596,7 @@ public void testMultipartUploadAbort() throws Exception {
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
for (int i = 12; i > 10; i--) {
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == 12, null));
}
abortUpload(uploadHandle, file);

Expand All @@ -601,7 +606,7 @@ public void testMultipartUploadAbort() throws Exception {

intercept(IOException.class,
() -> awaitFuture(
uploader0.putPart(uploadHandle, 49, file, is, len)));
uploader0.putPart(uploadHandle, 49, true, file, is, len)));
intercept(IOException.class,
() -> complete(uploader0, uploadHandle, file, partHandles));

Expand Down Expand Up @@ -701,7 +706,8 @@ public void testPutPartEmptyUploadID() throws Exception {
byte[] payload = generatePayload(1);
InputStream is = new ByteArrayInputStream(payload);
intercept(IllegalArgumentException.class,
() -> uploader0.putPart(emptyHandle, 1, dest, is, payload.length));
() -> uploader0.putPart(emptyHandle, 1, true, dest, is,
payload.length));
}

/**
Expand All @@ -715,7 +721,7 @@ public void testCompleteEmptyUploadID() throws Exception {
UploadHandle emptyHandle =
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
Map<Integer, PartHandle> partHandles = new HashMap<>();
PartHandle partHandle = putPart(dest, realHandle, 1,
PartHandle partHandle = putPart(dest, realHandle, 1, true,
generatePayload(1, SMALL_FILE));
partHandles.put(1, partHandle);

Expand Down Expand Up @@ -743,7 +749,7 @@ public void testDirectoryInTheWay() throws Exception {
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
int size = SMALL_FILE;
PartHandle partHandle = putPart(file, uploadHandle, 1,
PartHandle partHandle = putPart(file, uploadHandle, 1, true,
generatePayload(1, size));
partHandles.put(1, partHandle);

Expand Down Expand Up @@ -802,10 +808,10 @@ public void testConcurrentUploads() throws Throwable {
assertNotEquals("Upload handles match", upload1, upload2);

// put part 1
partHandles1.put(partId1, putPart(file, upload1, partId1, payload1));
partHandles1.put(partId1, putPart(file, upload1, partId1, false, payload1));

// put part2
partHandles2.put(partId2, putPart(file, upload2, partId2, payload2));
partHandles2.put(partId2, putPart(file, upload2, partId2, true, payload2));

// complete part u1. expect its size and digest to
// be as expected.
Expand Down
12 changes: 12 additions & 0 deletions hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@
<surefire.fork.timeout>900</surefire.fork.timeout>
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
<aws.eventstream.version>1.0.1</aws.eventstream.version>
<hsqldb.version>2.7.1</hsqldb.version>
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
Expand Down Expand Up @@ -1169,6 +1170,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.encryption.s3</groupId>
<artifactId>amazon-s3-encryption-client-java</artifactId>
<version>${amazon-s3-encryption-client-java.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
Expand Down
15 changes: 15 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,16 @@
<bannedImport>org.apache.hadoop.mapred.**</bannedImport>
</bannedImports>
</restrictImports>
<restrictImports>
<includeTestCode>false</includeTestCode>
<reason>Restrict encryption client imports to encryption client factory</reason>
<exclusions>
<exclusion>org.apache.hadoop.fs.s3a.impl.EncryptionS3ClientFactory</exclusion>
</exclusions>
<bannedImports>
<bannedImport>software.amazon.encryption.s3.**</bannedImport>
</bannedImports>
</restrictImports>
</rules>
</configuration>
</execution>
Expand Down Expand Up @@ -510,6 +520,11 @@
<artifactId>bundle</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>software.amazon.encryption.s3</groupId>
<artifactId>amazon-s3-encryption-client-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,40 @@ private Constants() {
public static final String S3_ENCRYPTION_CONTEXT =
"fs.s3a.encryption.context";

/**
* Client side encryption (CSE-CUSTOM) with custom cryptographic material manager class name.
* Custom keyring class name for CSE-KMS.
* value:{@value}
*/
public static final String S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME =
"fs.s3a.encryption.cse.custom.keyring.class.name";

/**
* Config to provide backward compatibility with V1 encryption client.
* Enabling this configuration will invoke the followings
* 1. Unencrypted s3 objects will be read using unecrypted/base s3 client when CSE is enabled.
* 2. Size of encrypted object will be calculated using ranged S3 calls.
* 3. While listing s3 objects, encryption metadata file with suffix
* {@link #S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX} will be skipped.
* This is to provide backward compatibility with V1 client.
* value:{@value}
*/
public static final String S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED =
"fs.s3a.encryption.cse.v1.compatibility.enabled";

/**
* Default value : {@value}.
*/
public static final boolean S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED_DEFAULT = false;

/**
* Suffix of instruction file : {@value}.
*/
public static final String S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX = ".instruction";




/**
* List of custom Signers. The signer class will be loaded, and the signer
* name will be associated with this signer class in the S3 SDK.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.s3accessgrants.plugin.S3AccessGrantsPlugin;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
Expand Down Expand Up @@ -160,11 +161,17 @@ public S3AsyncClient createS3AsyncClient(
.thresholdInBytes(parameters.getMultiPartThreshold())
.build();

return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
.httpClientBuilder(httpClientBuilder)
.multipartConfiguration(multipartConfiguration)
.multipartEnabled(parameters.isMultipartCopy())
.build();
S3AsyncClientBuilder s3AsyncClientBuilder =
configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
.httpClientBuilder(httpClientBuilder);

// TODO: Enable multi part upload with cse once it is available.
if (!parameters.isClientSideEncryptionEnabled()) {
s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration)
.multipartEnabled(parameters.isMultipartCopy());
}

return s3AsyncClientBuilder.build();
}

@Override
Expand Down Expand Up @@ -373,7 +380,7 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void
* @param conf config to build the URI from.
* @return an endpoint uri
*/
private static URI getS3Endpoint(String endpoint, final Configuration conf) {
public static URI getS3Endpoint(String endpoint, final Configuration conf) {

boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);

Expand Down
Loading

0 comments on commit 37eead5

Please sign in to comment.