-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline buffering and upload in S3 multi-upload #10729
Conversation
plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java
Show resolved
Hide resolved
AC |
plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java
Outdated
Show resolved
Hide resolved
if (aborted) { | ||
return; | ||
} | ||
aborted = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should failed
be true now too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will drop abort
change for now to keep this PR focused.
@@ -1478,6 +1478,7 @@ public synchronized void progressChanged(ProgressEvent progressEvent) | |||
private int bufferSize; | |||
|
|||
private boolean failed; | |||
private boolean multipartUploadStarted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are effectively 2 threads here
- calling thread (one thread; as it's not meant to be thread-safe)
- the background uploading thread (because we never submit another task to executor until previous finished)
let's document, for each field, which thread(s) use it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
And dropped other commits, replacing with commit message
// We use multipartUploadStarted flag here instead uploadId.ifPresent() because visibility of latter one is not | ||
// ensured here. It is mutated in other thread spawned by uploadExecutor.submit below |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is understandable from reviewer and historical perspective.
We wouldn't write such comment if we had the code right from the start, which suggests the comment is more suitable for the commit message than the code.
With code restructuring we needed to add separate multipartUploadStarted flag which is both written and read from main thread (not an upload) thread. With `waitForPreviousUploadFinish` moved below we lost memory-barrier between main and upload thread and we could no longer use `uploadId` to determine if multipart upload was already started. Co-authored-by: Łukasz Osipiuk <[email protected]>
Rework of: #10180
Original PR desc:
It turned out that this PR results in corrupted writes to S3 (#10710).
The problem came from the fact that with
waitForPreviousUploadFinish
call moved from the very beginning offlushBuffer
method further down, the check:was no longer valid.
One call to
flushBuffer
could initiate the multipart upload, but the next call toflushBuffer
could still observeuploadId
as not set (there was no memory barrier between setting and reading threads).Thanks @findepi for chat on this.
Co-authored-by: Łukasz Osipiuk [email protected]
Fixes #10710