Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace AsyncWrite with Upload trait and rename MultiPartStore to MultipartStore (#5458) #5500

Merged
merged 16 commits into from
Mar 19, 2024

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Mar 13, 2024

Which issue does this PR close?

Closes #5458
Closes #5443
Closes #5526

Rationale for this change

What changes are included in this PR?

This changes put_multipart to instead return a Box<dyn MultipartUpload>` that provides an API much closer to that of the stores themselves. This gives users much more control over buffering and concurrency, whilst also opening up new possibilities for integrating object_store into systems such as parquet.

Are there any user-facing changes?

@github-actions github-actions bot added the object-store Object Store Interface label Mar 13, 2024
@tustvold tustvold added the api-change Changes to the arrow API label Mar 13, 2024
Comment on lines -554 to -566
/// <div class="warning">
/// It is recommended applications wait for any in-flight requests to complete by calling `flush`, if
/// there may be a significant gap in time (> ~30s) before the next write.
/// These gaps can include times where the function returns control to the
/// caller while keeping the writer open. If `flush` is not called, futures
/// for in-flight requests may be left unpolled long enough for the requests
/// to time out, causing the write to fail.
/// </div>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning is no longer necessary 🎉

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.abort_multipart(location, multipart_id).await
async fn upload(&self, location: &Path) -> Result<Box<dyn Upload>> {
Copy link
Contributor Author

@tustvold tustvold Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can actually properly limit/throttle uploads now, as the public APIs now mirror the underlying requests 🎉

This will also enable things like reliably running IO on a separate tokio runtime

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you prefer shorter names, but here I recommend calling this multipart_upload and MultipartUpload to align with the term used by the cloud providers. Upload is pretty generic

https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-upload-object.html
https://cloud.google.com/storage/docs/xml-api/post-object-complete

Or maybe keep the existing put_multipart name (though I realize reusing the same name may be more confusing on upgrade)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was trying to avoid being too specific, as such a concept doesn't exist in LocalFileSystem, and Azure calls it something different

///
/// [`Sink`]: futures::sink::Sink
#[derive(Debug)]
pub struct ChunkedUpload {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not only significantly simpler than the WriteUpload it replaces, but avoids a number of the issues.

I think it is also a nice example of the flexibility of the Upload API, if a downstream wants to handle chunking/concurrency differently they're entirely able to do so.

///
/// Back pressure can optionally be applied to producers by calling
/// [`Self::wait_for_capacity`] prior to calling this method
pub fn write(&mut self, mut buf: &[u8]) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is worth highlighting that this is a synchronous method, and so users could wrap this in a type and feed it directly into a synchronous writer such as parquet's ArrowWriter. If back pressure is required they could potentially call wait_for_capacity between row groups.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this API basically requires copying the data. I wonder if there is some way to allow users to pass in an owned buffer already, like

    pub fn write(&mut self, buf: impl Into<Buffer>) {
    ...
    }

And then internally slicing up the Buffer to ensure the correct sizes

I realize that put currently requires a single contiguous buffer (per part), so maybe this copy isn't that big a problem. However it seems a pity that we require the copy 🤔

    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
        self.put_opts(location, bytes, PutOptions::default()).await
   }

Copy link
Contributor Author

@tustvold tustvold Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will file a ticket for supporting non-contiguous write payloads

Edit: #5514

object_store/src/upload.rs Show resolved Hide resolved
/// ```
///
/// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
fn put_part(&mut self, data: Bytes) -> UploadPart;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design of this is somewhat subtle.

A part index would run into issues for LocalFilesystem as we would need to know the offset to write the chunk to. If we permit out of order writes, e.g. writing the final partial chunk first, this chunk size would need to be provided at creation time of Upload. Aside from being an unfortunate API, this which would create non-trivial behaviour differences between stores that respect this config and those that ignore it.

Instead by taking a mutable borrow and providing the data to be written, we prohibit out of order writes. The final piece is we return a BoxFuture<'static, Result<()>> instead of this being an async fn, i.e. returning BoxFuture<'_, Result<()>>. This allows multiple UploadPart to be created and polled in parallel, without the borrow checker complaining about concurrent mutable borrows.

This is strictly a more flexible interface than the current AsyncWrite API, and whilst it still doesn't permit out of order writes, use-cases that require this can use MultiPartStore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API requires having the entire upload in a single contiguous Bytes buffer right? it doesn't seem to allow for incremental streaming writes, the way the current AsyncWrite does.

Maybe we could support both a put_part like this (all data at once) as well as a put_part_stream (that takes data as a steam of Bytes) 🤔

Even if the intention is that most people will use the ChunkedUpload API I think it is still worth considering non-contiguous writes

Copy link
Contributor Author

@tustvold tustvold Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying implementations in fact all require a contiguous buffer, either for the HTTP request or to spawn the blocking IO. I did spend a decent amount of time seeing if we could avoid this, but it requires some pretty intrusive refactoring to basically extract our own Bytes abstraction. Ultimately this is still strictly better than the current API

I'll file a ticket for supporting non-contiguous payloads in ObjectStore

Edit: filed #5514

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead by taking a mutable borrow and providing the data to be written, we prohibit out of order writes. The final piece is we return a BoxFuture<'static, Result<()>> instead of this being an async fn, i.e. returning BoxFuture<'_, Result<()>>. This allows multiple UploadPart to be created and polled in parallel, without the borrow checker complaining about concurrent mutable borrows.

I think this should be part of the docstring for future reference.

Comment on lines 30 to 31
///
/// Returns a stream
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
///
/// Returns a stream

fn put_part(&mut self, data: Bytes) -> UploadPart;

/// Complete the multipart upload
async fn complete(&mut self) -> Result<PutResult>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike MultiPartStore we expect Upload to handle the PartId internally, I think this makes for a more intuitive interface and avoids issues relating to the behaviour if the PartId are provided out of order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any usecase for the PartId needed by the client (e.g. is it something that S3 exposes that someone might want access to?)

Copy link
Contributor Author

@tustvold tustvold Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't an obvious way this information could be used by a user of the Upload trait, but if a user needed this information they could use the lower level MultiPartStore trait

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()>;
/// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
/// typically require multiple separate requests. See [`Upload`] for more information
async fn upload(&self, location: &Path) -> Result<Box<dyn Upload>>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike before we don't expose a notion of MultipartId. In hindsight this makes for a confusing API. We instead encourage users to configure automatic cleanup of incomplete uploads, which is the more reliable mechanism anyway

@alamb alamb changed the title Replace AsyncWrite with Upload trait (#5458) [ObjectStore] Replace AsyncWrite with Upload trait (#5458) Mar 14, 2024
Comment on lines +33 to +36
/// Most stores require that all parts excluding the last are at least 5 MiB, and some
/// further require that all parts excluding the last be the same size, e.g. [R2].
/// Clients wanting to maximise compatibility should therefore perform writes in
/// fixed size blocks larger than 5 MiB.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we provide an API to read these attributes from a DynObjectStore? Otherwise I don't see how anyone would ever be able to use DynObjectStore and put_part in combination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do, or writers could just 5 MiB chunks

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this API looks really nice @tustvold -- thank you 🙏

cc @wjones127 @devinjdangelo @wiedld

/// # async fn test() {
/// #
/// let mut upload: Box<&dyn Upload> = todo!();
/// let mut p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't quite follow the discussion about mut borrows below, but this example seems to demonstrate it is possible to uploads in parallel, which is a key usecase

/// ```
///
/// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
fn put_part(&mut self, data: Bytes) -> UploadPart;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API requires having the entire upload in a single contiguous Bytes buffer right? it doesn't seem to allow for incremental streaming writes, the way the current AsyncWrite does.

Maybe we could support both a put_part like this (all data at once) as well as a put_part_stream (that takes data as a steam of Bytes) 🤔

Even if the intention is that most people will use the ChunkedUpload API I think it is still worth considering non-contiguous writes

fn put_part(&mut self, data: Bytes) -> UploadPart;

/// Complete the multipart upload
async fn complete(&mut self) -> Result<PutResult>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any usecase for the PartId needed by the client (e.g. is it something that S3 exposes that someone might want access to?)

///
/// Back pressure can optionally be applied to producers by calling
/// [`Self::wait_for_capacity`] prior to calling this method
pub fn write(&mut self, mut buf: &[u8]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this API basically requires copying the data. I wonder if there is some way to allow users to pass in an owned buffer already, like

    pub fn write(&mut self, buf: impl Into<Buffer>) {
    ...
    }

And then internally slicing up the Buffer to ensure the correct sizes

I realize that put currently requires a single contiguous buffer (per part), so maybe this copy isn't that big a problem. However it seems a pity that we require the copy 🤔

    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
        self.put_opts(location, bytes, PutOptions::default()).await
   }

pub type UploadPart = BoxFuture<'static, Result<()>>;

#[async_trait]
pub trait Upload: Send + std::fmt::Debug {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other reviewers, This is the key new API

What is the correct description of this trait? Something like

Suggested change
pub trait Upload: Send + std::fmt::Debug {
/// Represents an inprogress multi-part upload
///
/// (Is this right? Can we actually abort a multi-part upload? )
/// Cancel behavior: On drop, the multi-part upload is aborted
pub trait Upload: Send + std::fmt::Debug {

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.abort_multipart(location, multipart_id).await
async fn upload(&self, location: &Path) -> Result<Box<dyn Upload>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you prefer shorter names, but here I recommend calling this multipart_upload and MultipartUpload to align with the term used by the cloud providers. Upload is pretty generic

https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-upload-object.html
https://cloud.google.com/storage/docs/xml-api/post-object-complete

Or maybe keep the existing put_multipart name (though I realize reusing the same name may be more confusing on upgrade)

}

/// Flush final chunk, and await completion of all in-flight requests
pub async fn finish(mut self) -> Result<PutResult> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this PutResult is what @ashtuchkin is asking for in #5443

object_store/src/upload.rs Show resolved Hide resolved
@crepererum
Copy link
Contributor

BTW: I really like the overall implementation!

@tustvold tustvold force-pushed the revisit-multipart-upload branch from 97bc03b to 1c8a965 Compare March 18, 2024 02:45
@@ -277,7 +43,7 @@ impl<T: PutPart> std::fmt::Debug for WriteMultiPart<T> {
/// [`ObjectStore::put_multipart`]: crate::ObjectStore::put_multipart
/// [`LocalFileSystem`]: crate::local::LocalFileSystem
#[async_trait]
pub trait MultiPartStore: Send + Sync + 'static {
pub trait MultipartStore: Send + Sync + 'static {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a drive-by fix that has annoyed me for a while, this crate routinely uses multipart as a single word, so this capitalization was inconsistent

@tustvold tustvold changed the title [ObjectStore] Replace AsyncWrite with Upload trait (#5458) Replace AsyncWrite with Upload trait and rename MultiPartStore to MultipartStore (#5458) Mar 18, 2024
@tustvold tustvold marked this pull request as ready for review March 18, 2024 04:09
@tustvold
Copy link
Contributor Author

I think this is now ready to go, I may refine it in subsequent PRs prior to release, but I am happy with where the core API is

///
/// It is implementation defined behaviour to call [`MultipartUpload::abort`]
/// on an already completed or aborted [`MultipartUpload`]
async fn abort(&mut self) -> Result<()>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very tempted to just not include this, in favour of just encouraging people to configure appropriate lifecycle rules, but am curious what other people think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This came up downstream as well in DataFusion apache/datafusion#9648 (comment)

My opinion is that we should include it in the API and appropriately caveat / explain why it is not always possible. I offered a suggestion above

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold TLDR is I think this PR looks really nice -- thank you for pushing it along. Overall this change looks good to me. I left several suggestions that I think are important to improve the comments, but if necessary we can do them as follow on PRs

///
/// It is implementation defined behaviour to call [`MultipartUpload::abort`]
/// on an already completed or aborted [`MultipartUpload`]
async fn abort(&mut self) -> Result<()>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This came up downstream as well in DataFusion apache/datafusion#9648 (comment)

My opinion is that we should include it in the API and appropriately caveat / explain why it is not always possible. I offered a suggestion above

Comment on lines 76 to 79
/// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`],
/// some implementations will automatically reap any uploaded parts. However,
/// this is not always possible, e.g. for S3 and GCS. [`MultipartUpload::abort`] can
/// therefore be invoked to perform this cleanup.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the term reap, while more literary, has a greater potential to be misunderstood. I also think it would help to explain why it is not always possible.

Here is a suggestion

Suggested change
/// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`],
/// some implementations will automatically reap any uploaded parts. However,
/// this is not always possible, e.g. for S3 and GCS. [`MultipartUpload::abort`] can
/// therefore be invoked to perform this cleanup.
/// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`],
/// some object stores will automatically clean up any previously uploaded parts.
/// However, some stores such as for S3 and GCS do not perform automatic cleanup and
/// in such cases, [`MultipartUpload::abort`] manually invokes this cleanup.

object_store/src/upload.rs Outdated Show resolved Hide resolved
object_store/src/upload.rs Outdated Show resolved Hide resolved
object_store/src/upload.rs Outdated Show resolved Hide resolved
object_store/src/upload.rs Show resolved Hide resolved
object_store/src/upload.rs Outdated Show resolved Hide resolved
@@ -269,12 +269,11 @@
//!
//! # Multipart Upload
//!
//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data,
//! with implementations automatically handling parallel, chunked upload where appropriate.
//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add an example using BufWriter here (to drive people to use that unless they really need to use the multi part API themselves)? It has the nice property that it does put/put-mulitpart dynamically. '

invalid_state("when writer is already complete.")
}
let s = Arc::clone(&self.state);
maybe_spawn_blocking(move || {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Ok((id, writer))
let upload = store.put_multipart(&path).await?;
let mut chunked = WriteMultipart::new(upload);
chunked.write(&buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this write results in a second buffer copy, right? That double buffering is unfortunate

@tustvold
Copy link
Contributor Author

I intend to follow this up with a PR with some further tweaks to the BufWriter / WriteMultipart structures, but I'd like to get the changes to ObjectStore in, and then iterate on top of this

@tustvold tustvold merged commit 96c4c0b into apache:master Mar 19, 2024
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api-change Changes to the arrow API object-store Object Store Interface
Projects
None yet
3 participants