Skip to content

Commit

Permalink
Pipeline buffering and upload in S3 multi-upload.
Browse files Browse the repository at this point in the history
With code restructuring we needed to add separate multipartUploadStarted
flag which is both written and read from main thread (not an upload)
thread. With `waitForPreviousUploadFinish` moved below we lost
memory-barrier between main and upload thread and we could no longer use
`uploadId` to determine if multipart upload was already started.

Co-authored-by: Łukasz Osipiuk <[email protected]>
  • Loading branch information
2 people authored and losipiuk committed Jan 24, 2022
1 parent be22531 commit 1b110c5
Showing 1 changed file with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1478,6 +1478,10 @@ private static class TrinoS3StreamingOutputStream
private int bufferSize;

private boolean failed;
// Mutated and read by main thread; mutated just before scheduling upload to background thread (access does not need to be thread safe)
private boolean multipartUploadStarted;
// Mutated by background thread which does the multipart upload; read by both main thread and background thread;
// Visibility ensured by memory barrier via inProgressUploadFuture
private Optional<String> uploadId = Optional.empty();
private Future<UploadPartResult> inProgressUploadFuture;
private final List<UploadPartResult> parts = new ArrayList<>();
Expand Down Expand Up @@ -1571,17 +1575,8 @@ public void close()
private void flushBuffer(boolean finished)
throws IOException
{
try {
waitForPreviousUploadFinish();
}
catch (IOException e) {
failed = true;
abortUploadSuppressed(e);
throw e;
}

// skip multipart upload if there would only be one part
if (finished && uploadId.isEmpty()) {
// Skip multipart upload if there would only be one part
if (finished && !multipartUploadStarted) {
InputStream in = new ByteArrayInputStream(buffer, 0, bufferSize);

ObjectMetadata metadata = new ObjectMetadata();
Expand Down Expand Up @@ -1614,6 +1609,15 @@ private void flushBuffer(boolean finished)
bufferSize = 0;
}

try {
waitForPreviousUploadFinish();
}
catch (IOException e) {
failed = true;
abortUploadSuppressed(e);
throw e;
}
multipartUploadStarted = true;
inProgressUploadFuture = uploadExecutor.submit(() -> uploadPage(data, length));
}
}
Expand Down

0 comments on commit 1b110c5

Please sign in to comment.