-
Notifications
You must be signed in to change notification settings - Fork 841
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support non-contiguous put payloads / vectored writes (#5514) #5538
Support non-contiguous put payloads / vectored writes (#5514) #5538
Conversation
@@ -438,16 +443,8 @@ impl S3Client { | |||
|
|||
let mut builder = self.client.request(Method::POST, url); | |||
|
|||
// Compute checksum - S3 *requires* this for DeleteObjects requests, so we default to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleanup given SHA256 is the only checksum we support and are ever likely to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(and to be clear for anyone else reading this PR -- this code is not exposed publicly so it can be changed in the future if needed as well)
object_store/src/client/retry.rs
Outdated
async move { | ||
let mut retries = 0; | ||
let now = Instant::now(); | ||
|
||
loop { | ||
let s = req.try_clone().expect("request body must be cloneable"); | ||
match client.execute(s).await { | ||
let mut s = self.try_clone().expect("request body must be cloneable"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We defer setting the body until this point when we have a streaming body, as otherwise this would fail
3640a8c
to
4e20f2d
Compare
4e20f2d
to
70de4ee
Compare
@@ -468,8 +468,8 @@ impl S3Client { | |||
let response = builder | |||
.header(CONTENT_TYPE, "application/xml") | |||
.body(body) | |||
.with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref()) | |||
.send_retry_with_idempotency(&self.config.retry_config, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
false
is the default for a POST
request
@@ -186,11 +186,7 @@ impl DynamoCommit { | |||
to: &Path, | |||
) -> Result<()> { | |||
self.conditional_op(client, to, None, || async { | |||
client | |||
.copy_request(from, to) | |||
.set_idempotent(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
false
is the default
@@ -237,21 +242,23 @@ impl AzureClient { | |||
builder = builder.header(CONTENT_TYPE, value); | |||
} | |||
|
|||
builder = builder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved into send
above
object_store/src/client/retry.rs
Outdated
.expect("request body must be cloneable"); | ||
|
||
if let Some(x) = &self.payload { | ||
*s.body_mut() = Some(x.body()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting this here sidesteps issues around clone-ability of stream based request bodies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a function call like s.set_body(Some(x.body())
instead might be easier to understand the intent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an upstream type unfortunately
@@ -327,20 +335,24 @@ impl GoogleCloudStorageClient { | |||
let builder = self | |||
.client | |||
.request(Method::PUT, url) | |||
.header(header::CONTENT_TYPE, content_type) | |||
.header(header::CONTENT_LENGTH, payload.len()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved into send
@@ -231,7 +231,7 @@ impl std::fmt::Debug for BufWriter { | |||
|
|||
enum BufWriterState { | |||
/// Buffer up to capacity bytes | |||
Buffer(Path, Vec<u8>), | |||
Buffer(Path, PutPayloadMut), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change means that we no longer bump allocate 🎉
@@ -121,7 +122,8 @@ impl WriteMultipart { | |||
pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self { | |||
Self { | |||
upload, | |||
buffer: Vec::with_capacity(chunk_size), | |||
chunk_size, | |||
buffer: PutPayloadMut::new(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change means we avoid bump allocating but also don't allocate an entire 10MB buffer up-front only to potentially use some fraction of it
|
||
impl FromIterator<Bytes> for PutPayload { | ||
fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self { | ||
Self(iter.into_iter().collect()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Arc::from_iter()
directly to reduce one alloaction (after TrustedLen
stablized in the future).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this does use Arc::from_iter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arc::from_iter
has special logic for TrustedLen
that can avoid the collect()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at the definition of collect
it just invokes from_iter
for the collection being collected into, in this case Arc
fn collect<B: FromIterator<Self::Item>>(self) -> B
where
Self: Sized,
{
FromIterator::from_iter(self)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yes!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank yoU @tustvold -- I think the structure of this code / API is quite nice and elegant 👌 👏
I do think it is a non trivial API change (not complicated, but will require a bunch of mechanical changes) and so marked the PR as API change. To that end I suggest we add some better documentation to help users map types they are familiar with to the new PutPayload.
Here is a suggestion: tustvold#81
Other than that I had a few suggestions / nits, the only one I feel somewhat strongly about is adding a new put
rather than updating the existing write
API.
@@ -266,7 +270,8 @@ pub(crate) struct Request<'a> { | |||
path: &'a Path, | |||
config: &'a S3Config, | |||
builder: RequestBuilder, | |||
payload_sha256: Option<Vec<u8>>, | |||
payload_sha256: Option<digest::Digest>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are unrelated cleanups right? Or is there a reason digest::Digest
is preferred over Vec<u8>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It avoids an allocation, but yes an unrelated cleanup
@@ -438,16 +443,8 @@ impl S3Client { | |||
|
|||
let mut builder = self.client.request(Method::POST, url); | |||
|
|||
// Compute checksum - S3 *requires* this for DeleteObjects requests, so we default to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(and to be clear for anyone else reading this PR -- this code is not exposed publicly so it can be changed in the future if needed as well)
/// should be able to observe a partially written object | ||
async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> { | ||
self.put_opts(location, bytes, PutOptions::default()).await | ||
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For anyone else reviewing this PR. I think this is an (and the key) API change -- to take a slice of [Bytes]
bytes rather than a Bytes
directly
} | ||
|
||
/// Append a [`Bytes`] to this [`PutPayloadMut`] | ||
pub fn push(&mut self, bytes: Bytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please document that this is a zero copy API -- specifically that the progress is closed off and the underlying data is not copied.
if !self.in_progress.is_empty() { | ||
let completed = std::mem::take(&mut self.in_progress).into(); | ||
self.completed.push(completed); | ||
} | ||
PutPayload(self.completed.into()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think you can save typing and allocating another Vec with something like
if !self.in_progress.is_empty() { | |
let completed = std::mem::take(&mut self.in_progress).into(); | |
self.completed.push(completed); | |
} | |
PutPayload(self.completed.into()) | |
let Self { in_progress, completed, ..} = self; | |
if !in_progress.is_empty() { | |
completed.push(in_progress); | |
} | |
PutPayload(completed.into()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Vec::default doesn't allocate
assert_eq!(chunks[3].len(), 23); | ||
assert_eq!(chunks[4].len(), 20); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to add a test for push
as well (esp to document the non copying behavior)
/// Put a chunk of data into this [`WriteMultipart`] | ||
/// | ||
/// See [`Self::write`] for information on backpressure | ||
pub fn put(&mut self, mut bytes: Bytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are changing the API anyways, is there a reason to keep both write
and put
? As in why not change write
to take a Bytes
as one can could then call write
via write(my_payload.into())
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are circumstances where having the writer perform the buffering might be advantageous, e.g. when integrating with the Write
trait
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would be great to include in the docs as I found it non obvious.
Update the docs were updated (thanks 🙏 )
/// Put a chunk of data into this [`WriteMultipart`] without copying
///
/// Data is buffered using [`PutPayloadMut::push`]. Implementations looking to
/// perform writes from non-owned buffers should prefer [`Self::write`] as this
/// will allow multiple calls to share the same underlying allocation.
object_store/src/client/retry.rs
Outdated
.expect("request body must be cloneable"); | ||
|
||
if let Some(x) = &self.payload { | ||
*s.body_mut() = Some(x.body()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a function call like s.set_body(Some(x.body())
instead might be easier to understand the intent
} | ||
|
||
impl RetryableRequest { | ||
/// Set whether this request is idempotent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we document what idempotent
means in this context? Like it means the entire request can be retried?
object_store/src/client/retry.rs
Outdated
loop { | ||
let s = req.try_clone().expect("request body must be cloneable"); | ||
match client.execute(s).await { | ||
let mut s = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you like brevity but one character variable names that are different than the first letter of the things they are holding are pretty hard for me to grok. (I realize you can't use r
b/c there is another variable below named r
).
How about we rename s
to request
and x
to payload
?
Which issue does this PR close?
Closes #5514
Rationale for this change
Allowing non-contiguous payloads avoids needing to bump allocate buffers when writing data where the size isn't known up front. This can be significantly more efficient.
It will also open up interesting possibilities for the parquet writer, where the data is already buffered in chunks
What changes are included in this PR?
Are there any user-facing changes?
FYI @roeap @wjones127 @alamb