diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index b11f4513b6df..b33771de9a86 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -17,17 +17,14 @@ //! An object store implementation for S3 //! -//! ## Multi-part uploads +//! ## Multipart uploads //! -//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart] method. -//! Data passed to the writer is automatically buffered to meet the minimum size -//! requirements for a part. Multiple parts are uploaded concurrently. +//! Multipart uploads can be initiated with the [ObjectStore::put_multipart] method. //! //! If the writer fails for any reason, you may have parts uploaded to AWS but not -//! used that you may be charged for. Use the [ObjectStore::abort_multipart] method -//! to abort the upload and drop those unneeded parts. In addition, you may wish to -//! consider implementing [automatic cleanup] of unused parts that are older than one -//! week. +//! used that you will be charged for. [`MultipartUpload::abort`] may be invoked to drop +//! these unneeded parts, however, it is recommended that you consider implementing +//! [automatic cleanup] of unused parts that are older than some threshold. //! //! [automatic cleanup]: https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/ @@ -38,18 +35,17 @@ use futures::{StreamExt, TryStreamExt}; use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; use reqwest::{Method, StatusCode}; use std::{sync::Arc, time::Duration}; -use tokio::io::AsyncWrite; use url::Url; use crate::aws::client::{RequestError, S3Client}; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; -use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}; +use crate::multipart::{MultipartStore, PartId}; use crate::signer::Signer; use crate::{ - Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode, - PutOptions, PutResult, Result, + Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, + ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart, }; static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging"); @@ -85,6 +81,7 @@ const STORE: &str = "S3"; /// [`CredentialProvider`] for [`AmazonS3`] pub type AwsCredentialProvider = Arc>; +use crate::client::parts::Parts; pub use credential::{AwsAuthorizer, AwsCredential}; /// Interface for [Amazon S3](https://aws.amazon.com/s3/). @@ -211,25 +208,18 @@ impl ObjectStore for AmazonS3 { } } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let id = self.client.create_multipart(location).await?; - - let upload = S3MultiPartUpload { - location: location.clone(), - upload_id: id.clone(), - client: Arc::clone(&self.client), - }; - - Ok((id, Box::new(WriteMultiPart::new(upload, 8)))) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - self.client - .delete_request(location, &[("uploadId", multipart_id)]) - .await + async fn put_multipart(&self, location: &Path) -> Result> { + let upload_id = self.client.create_multipart(location).await?; + + Ok(Box::new(S3MultiPartUpload { + part_idx: 0, + state: Arc::new(UploadState { + client: Arc::clone(&self.client), + location: location.clone(), + upload_id: upload_id.clone(), + parts: Default::default(), + }), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -319,30 +309,55 @@ impl ObjectStore for AmazonS3 { } } +#[derive(Debug)] struct S3MultiPartUpload { + part_idx: usize, + state: Arc, +} + +#[derive(Debug)] +struct UploadState { + parts: Parts, location: Path, upload_id: String, client: Arc, } #[async_trait] -impl PutPart for S3MultiPartUpload { - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { - self.client - .put_part(&self.location, &self.upload_id, part_idx, buf.into()) +impl MultipartUpload for S3MultiPartUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let idx = self.part_idx; + self.part_idx += 1; + let state = Arc::clone(&self.state); + Box::pin(async move { + let part = state + .client + .put_part(&state.location, &state.upload_id, idx, data) + .await?; + state.parts.put(idx, part); + Ok(()) + }) + } + + async fn complete(&mut self) -> Result { + let parts = self.state.parts.finish(self.part_idx)?; + + self.state + .client + .complete_multipart(&self.state.location, &self.state.upload_id, parts) .await } - async fn complete(&self, completed_parts: Vec) -> Result<()> { - self.client - .complete_multipart(&self.location, &self.upload_id, completed_parts) - .await?; - Ok(()) + async fn abort(&mut self) -> Result<()> { + self.state + .client + .delete_request(&self.state.location, &[("uploadId", &self.state.upload_id)]) + .await } } #[async_trait] -impl MultiPartStore for AmazonS3 { +impl MultipartStore for AmazonS3 { async fn create_multipart(&self, path: &Path) -> Result { self.client.create_multipart(path).await } @@ -377,7 +392,6 @@ mod tests { use crate::{client::get::GetClient, tests::*}; use bytes::Bytes; use hyper::HeaderMap; - use tokio::io::AsyncWriteExt; const NON_EXISTENT_NAME: &str = "nonexistentname"; @@ -542,9 +556,9 @@ mod tests { store.put(&locations[0], data.clone()).await.unwrap(); store.copy(&locations[0], &locations[1]).await.unwrap(); - let (_, mut writer) = store.put_multipart(&locations[2]).await.unwrap(); - writer.write_all(&data).await.unwrap(); - writer.shutdown().await.unwrap(); + let mut upload = store.put_multipart(&locations[2]).await.unwrap(); + upload.put_part(data.clone()).await.unwrap(); + upload.complete().await.unwrap(); for location in &locations { let res = store diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 712b7a36c56a..5d3a405ccc93 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -19,19 +19,15 @@ //! //! ## Streaming uploads //! -//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those -//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks -//! are uploaded concurrently. +//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those blocks. //! -//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide -//! a way to drop old blocks. Instead unused blocks are automatically cleaned up -//! after 7 days. +//! Unused blocks will automatically be dropped after 7 days. use crate::{ - multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}, + multipart::{MultipartStore, PartId}, path::Path, signer::Signer, - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, - Result, + GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, + PutOptions, PutResult, Result, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -40,7 +36,6 @@ use reqwest::Method; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; -use tokio::io::AsyncWrite; use url::Url; use crate::client::get::GetClientExt; @@ -54,6 +49,8 @@ mod credential; /// [`CredentialProvider`] for [`MicrosoftAzure`] pub type AzureCredentialProvider = Arc>; +use crate::azure::client::AzureClient; +use crate::client::parts::Parts; pub use builder::{AzureConfigKey, MicrosoftAzureBuilder}; pub use credential::AzureCredential; @@ -94,21 +91,15 @@ impl ObjectStore for MicrosoftAzure { self.client.put_blob(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let inner = AzureMultiPartUpload { - client: Arc::clone(&self.client), - location: location.to_owned(), - }; - Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8)))) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - // There is no way to drop blocks that have been uploaded. Instead, they simply - // expire in 7 days. - Ok(()) + async fn put_multipart(&self, location: &Path) -> Result> { + Ok(Box::new(AzureMultiPartUpload { + part_idx: 0, + state: Arc::new(UploadState { + client: Arc::clone(&self.client), + location: location.clone(), + parts: Default::default(), + }), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -197,26 +188,49 @@ impl Signer for MicrosoftAzure { /// put_multipart_part -> PUT block /// complete -> PUT block list /// abort -> No equivalent; blocks are simply dropped after 7 days -#[derive(Debug, Clone)] +#[derive(Debug)] struct AzureMultiPartUpload { - client: Arc, + part_idx: usize, + state: Arc, +} + +#[derive(Debug)] +struct UploadState { location: Path, + parts: Parts, + client: Arc, } #[async_trait] -impl PutPart for AzureMultiPartUpload { - async fn put_part(&self, buf: Vec, idx: usize) -> Result { - self.client.put_block(&self.location, idx, buf.into()).await +impl MultipartUpload for AzureMultiPartUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let idx = self.part_idx; + self.part_idx += 1; + let state = Arc::clone(&self.state); + Box::pin(async move { + let part = state.client.put_block(&state.location, idx, data).await?; + state.parts.put(idx, part); + Ok(()) + }) + } + + async fn complete(&mut self) -> Result { + let parts = self.state.parts.finish(self.part_idx)?; + + self.state + .client + .put_block_list(&self.state.location, parts) + .await } - async fn complete(&self, parts: Vec) -> Result<()> { - self.client.put_block_list(&self.location, parts).await?; + async fn abort(&mut self) -> Result<()> { + // Nothing to do Ok(()) } } #[async_trait] -impl MultiPartStore for MicrosoftAzure { +impl MultipartStore for MicrosoftAzure { async fn create_multipart(&self, _: &Path) -> Result { Ok(String::new()) } diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 9299e1147bc1..39f8eafbef7e 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -18,7 +18,7 @@ //! Utilities for performing tokio-style buffered IO use crate::path::Path; -use crate::{MultipartId, ObjectMeta, ObjectStore}; +use crate::{ObjectMeta, ObjectStore, WriteMultipart}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -27,7 +27,7 @@ use std::io::{Error, ErrorKind, SeekFrom}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, ReadBuf}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; /// The default buffer size used by [`BufReader`] pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024; @@ -217,7 +217,6 @@ impl AsyncBufRead for BufReader { pub struct BufWriter { capacity: usize, state: BufWriterState, - multipart_id: Option, store: Arc, } @@ -225,22 +224,19 @@ impl std::fmt::Debug for BufWriter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BufWriter") .field("capacity", &self.capacity) - .field("multipart_id", &self.multipart_id) .finish() } } -type MultipartResult = (MultipartId, Box); - enum BufWriterState { /// Buffer up to capacity bytes Buffer(Path, Vec), /// [`ObjectStore::put_multipart`] - Prepare(BoxFuture<'static, std::io::Result>), + Prepare(BoxFuture<'static, std::io::Result>), /// Write to a multipart upload - Write(Box), + Write(Option), /// [`ObjectStore::put`] - Put(BoxFuture<'static, std::io::Result<()>>), + Flush(BoxFuture<'static, std::io::Result<()>>), } impl BufWriter { @@ -255,14 +251,20 @@ impl BufWriter { capacity, store, state: BufWriterState::Buffer(path, Vec::new()), - multipart_id: None, } } - /// Returns the [`MultipartId`] of the multipart upload created by this - /// writer, if any. - pub fn multipart_id(&self) -> Option<&MultipartId> { - self.multipart_id.as_ref() + /// Abort this writer, cleaning up any partially uploaded state + /// + /// # Panic + /// + /// Panics if this writer has already been shutdown or aborted + pub async fn abort(&mut self) -> crate::Result<()> { + match &mut self.state { + BufWriterState::Buffer(_, _) | BufWriterState::Prepare(_) => Ok(()), + BufWriterState::Flush(_) => panic!("Already shut down"), + BufWriterState::Write(x) => x.take().unwrap().abort().await, + } } } @@ -275,12 +277,15 @@ impl AsyncWrite for BufWriter { let cap = self.capacity; loop { return match &mut self.state { - BufWriterState::Write(write) => Pin::new(write).poll_write(cx, buf), - BufWriterState::Put(_) => panic!("Already shut down"), + BufWriterState::Write(Some(write)) => { + write.write(buf); + Poll::Ready(Ok(buf.len())) + } + BufWriterState::Write(None) | BufWriterState::Flush(_) => { + panic!("Already shut down") + } BufWriterState::Prepare(f) => { - let (id, w) = ready!(f.poll_unpin(cx)?); - self.state = BufWriterState::Write(w); - self.multipart_id = Some(id); + self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into()); continue; } BufWriterState::Buffer(path, b) => { @@ -289,9 +294,10 @@ impl AsyncWrite for BufWriter { let path = std::mem::take(path); let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { - let (id, mut writer) = store.put_multipart(&path).await?; - writer.write_all(&buffer).await?; - Ok((id, writer)) + let upload = store.put_multipart(&path).await?; + let mut chunked = WriteMultipart::new(upload); + chunked.write(&buffer); + Ok(chunked) })); continue; } @@ -305,13 +311,10 @@ impl AsyncWrite for BufWriter { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { return match &mut self.state { - BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())), - BufWriterState::Write(write) => Pin::new(write).poll_flush(cx), - BufWriterState::Put(_) => panic!("Already shut down"), + BufWriterState::Write(_) | BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())), + BufWriterState::Flush(_) => panic!("Already shut down"), BufWriterState::Prepare(f) => { - let (id, w) = ready!(f.poll_unpin(cx)?); - self.state = BufWriterState::Write(w); - self.multipart_id = Some(id); + self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into()); continue; } }; @@ -322,21 +325,28 @@ impl AsyncWrite for BufWriter { loop { match &mut self.state { BufWriterState::Prepare(f) => { - let (id, w) = ready!(f.poll_unpin(cx)?); - self.state = BufWriterState::Write(w); - self.multipart_id = Some(id); + self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into()); } BufWriterState::Buffer(p, b) => { let buf = std::mem::take(b); let path = std::mem::take(p); let store = Arc::clone(&self.store); - self.state = BufWriterState::Put(Box::pin(async move { + self.state = BufWriterState::Flush(Box::pin(async move { store.put(&path, buf.into()).await?; Ok(()) })); } - BufWriterState::Put(f) => return f.poll_unpin(cx), - BufWriterState::Write(w) => return Pin::new(w).poll_shutdown(cx), + BufWriterState::Flush(f) => return f.poll_unpin(cx), + BufWriterState::Write(x) => { + let upload = x.take().unwrap(); + self.state = BufWriterState::Flush( + async move { + upload.finish().await?; + Ok(()) + } + .boxed(), + ) + } } } } @@ -357,7 +367,7 @@ mod tests { use super::*; use crate::memory::InMemory; use crate::path::Path; - use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt}; + use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; #[tokio::test] async fn test_buf_reader() { @@ -448,9 +458,7 @@ mod tests { writer.write_all(&[0; 20]).await.unwrap(); writer.flush().await.unwrap(); writer.write_all(&[0; 5]).await.unwrap(); - assert!(writer.multipart_id().is_none()); writer.shutdown().await.unwrap(); - assert!(writer.multipart_id().is_none()); assert_eq!(store.head(&path).await.unwrap().size, 25); // Test multipart @@ -458,9 +466,7 @@ mod tests { writer.write_all(&[0; 20]).await.unwrap(); writer.flush().await.unwrap(); writer.write_all(&[0; 20]).await.unwrap(); - assert!(writer.multipart_id().is_some()); writer.shutdown().await.unwrap(); - assert!(writer.multipart_id().is_some()); assert_eq!(store.head(&path).await.unwrap().size, 40); } diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index d33556f4b12e..6db7f4b35e24 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -25,14 +25,13 @@ use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use futures::stream::BoxStream; use futures::StreamExt; -use tokio::io::AsyncWrite; use crate::path::Path; +use crate::Result; use crate::{ - GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, - PutResult, + GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutOptions, PutResult, }; -use crate::{MultipartId, Result}; /// Wraps a [`ObjectStore`] and makes its get response return chunks /// in a controllable manner. @@ -67,17 +66,10 @@ impl ObjectStore for ChunkedStore { self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { + async fn put_multipart(&self, location: &Path) -> Result> { self.inner.put_multipart(location).await } - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - self.inner.abort_multipart(location, multipart_id).await - } - async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let r = self.inner.get_opts(location, options).await?; let stream = match r.payload { diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 252e9fdcadf5..7728f38954f9 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -40,6 +40,9 @@ pub mod header; #[cfg(any(feature = "aws", feature = "gcp"))] pub mod s3; +#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] +pub mod parts; + use async_trait::async_trait; use std::collections::HashMap; use std::str::FromStr; diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs new file mode 100644 index 000000000000..9fc301edcf81 --- /dev/null +++ b/object_store/src/client/parts.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::multipart::PartId; +use parking_lot::Mutex; + +/// An interior mutable collection of upload parts and their corresponding part index +#[derive(Debug, Default)] +pub(crate) struct Parts(Mutex>); + +impl Parts { + /// Record the [`PartId`] for a given index + /// + /// Note: calling this method multiple times with the same `part_idx` + /// will result in multiple [`PartId`] in the final output + pub(crate) fn put(&self, part_idx: usize, id: PartId) { + self.0.lock().push((part_idx, id)) + } + + /// Produce the final list of [`PartId`] ordered by `part_idx` + /// + /// `expected` is the number of parts expected in the final result + pub(crate) fn finish(&self, expected: usize) -> crate::Result> { + let mut parts = self.0.lock(); + if parts.len() != expected { + return Err(crate::Error::Generic { + store: "Parts", + source: "Missing part".to_string().into(), + }); + } + parts.sort_unstable_by_key(|(idx, _)| *idx); + Ok(parts.drain(..).map(|(_, v)| v).collect()) + } +} diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index e4b0f9af7d15..def53beefe78 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -272,7 +272,7 @@ impl GoogleCloudStorageClient { }) } - /// Initiate a multi-part upload + /// Initiate a multipart upload pub async fn multipart_initiate(&self, path: &Path) -> Result { let credential = self.get_credential().await?; let url = self.object_url(path); diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 8633abbfb4dc..2058d1f8055b 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -17,18 +17,14 @@ //! An object store implementation for Google Cloud Storage //! -//! ## Multi-part uploads +//! ## Multipart uploads //! -//! [Multi-part uploads](https://cloud.google.com/storage/docs/multipart-uploads) -//! can be initiated with the [ObjectStore::put_multipart] method. -//! Data passed to the writer is automatically buffered to meet the minimum size -//! requirements for a part. Multiple parts are uploaded concurrently. -//! -//! If the writer fails for any reason, you may have parts uploaded to GCS but not -//! used that you may be charged for. Use the [ObjectStore::abort_multipart] method -//! to abort the upload and drop those unneeded parts. In addition, you may wish to -//! consider implementing automatic clean up of unused parts that are older than one -//! week. +//! [Multipart uploads](https://cloud.google.com/storage/docs/multipart-uploads) +//! can be initiated with the [ObjectStore::put_multipart] method. If neither +//! [`MultipartUpload::complete`] nor [`MultipartUpload::abort`] is invoked, you may +//! have parts uploaded to GCS but not used, that you will be charged for. It is recommended +//! you configure a [lifecycle rule] to abort incomplete multipart uploads after a certain +//! period of time to avoid being charged for storing partial uploads. //! //! ## Using HTTP/2 //! @@ -36,24 +32,24 @@ //! because it allows much higher throughput in our benchmarks (see //! [#5194](https://github.com/apache/arrow-rs/issues/5194)). HTTP/2 can be //! enabled by setting [crate::ClientConfigKey::Http1Only] to false. +//! +//! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu use std::sync::Arc; use crate::client::CredentialProvider; use crate::{ - multipart::{PartId, PutPart, WriteMultiPart}, - path::Path, - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, - Result, + multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, + ObjectMeta, ObjectStore, PutOptions, PutResult, Result, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; use client::GoogleCloudStorageClient; use futures::stream::BoxStream; -use tokio::io::AsyncWrite; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; -use crate::multipart::MultiPartStore; +use crate::client::parts::Parts; +use crate::multipart::MultipartStore; pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; pub use credential::GcpCredential; @@ -89,27 +85,50 @@ impl GoogleCloudStorage { } } +#[derive(Debug)] struct GCSMultipartUpload { + state: Arc, + part_idx: usize, +} + +#[derive(Debug)] +struct UploadState { client: Arc, path: Path, multipart_id: MultipartId, + parts: Parts, } #[async_trait] -impl PutPart for GCSMultipartUpload { - /// Upload an object part - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { - self.client - .put_part(&self.path, &self.multipart_id, part_idx, buf.into()) +impl MultipartUpload for GCSMultipartUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let idx = self.part_idx; + self.part_idx += 1; + let state = Arc::clone(&self.state); + Box::pin(async move { + let part = state + .client + .put_part(&state.path, &state.multipart_id, idx, data) + .await?; + state.parts.put(idx, part); + Ok(()) + }) + } + + async fn complete(&mut self) -> Result { + let parts = self.state.parts.finish(self.part_idx)?; + + self.state + .client + .multipart_complete(&self.state.path, &self.state.multipart_id, parts) .await } - /// Complete a multipart upload - async fn complete(&self, completed_parts: Vec) -> Result<()> { - self.client - .multipart_complete(&self.path, &self.multipart_id, completed_parts) - .await?; - Ok(()) + async fn abort(&mut self) -> Result<()> { + self.state + .client + .multipart_cleanup(&self.state.path, &self.state.multipart_id) + .await } } @@ -119,27 +138,18 @@ impl ObjectStore for GoogleCloudStorage { self.client.put(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { + async fn put_multipart(&self, location: &Path) -> Result> { let upload_id = self.client.multipart_initiate(location).await?; - let inner = GCSMultipartUpload { - client: Arc::clone(&self.client), - path: location.clone(), - multipart_id: upload_id.clone(), - }; - - Ok((upload_id, Box::new(WriteMultiPart::new(inner, 8)))) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - self.client - .multipart_cleanup(location, multipart_id) - .await?; - - Ok(()) + Ok(Box::new(GCSMultipartUpload { + part_idx: 0, + state: Arc::new(UploadState { + client: Arc::clone(&self.client), + path: location.clone(), + multipart_id: upload_id.clone(), + parts: Default::default(), + }), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -176,7 +186,7 @@ impl ObjectStore for GoogleCloudStorage { } #[async_trait] -impl MultiPartStore for GoogleCloudStorage { +impl MultipartStore for GoogleCloudStorage { async fn create_multipart(&self, path: &Path) -> Result { self.client.multipart_initiate(path).await } diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index f1d11db4762c..626337df27f9 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -37,7 +37,6 @@ use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use snafu::{OptionExt, ResultExt, Snafu}; -use tokio::io::AsyncWrite; use url::Url; use crate::client::get::GetClientExt; @@ -45,7 +44,7 @@ use crate::client::header::get_etag; use crate::http::client::Client; use crate::path::Path; use crate::{ - ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, + ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, }; @@ -115,15 +114,8 @@ impl ObjectStore for HttpStore { }) } - async fn put_multipart( - &self, - _location: &Path, - ) -> Result<(MultipartId, Box)> { - Err(super::Error::NotImplemented) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - Err(super::Error::NotImplemented) + async fn put_multipart(&self, _location: &Path) -> Result> { + Err(crate::Error::NotImplemented) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 4960f3ba390a..e02675d88abe 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -269,12 +269,11 @@ //! //! # Multipart Upload //! -//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data, -//! with implementations automatically handling parallel, chunked upload where appropriate. +//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data //! //! ``` //! # use object_store::local::LocalFileSystem; -//! # use object_store::ObjectStore; +//! # use object_store::{ObjectStore, WriteMultipart}; //! # use std::sync::Arc; //! # use bytes::Bytes; //! # use tokio::io::AsyncWriteExt; @@ -286,12 +285,10 @@ //! # //! let object_store: Arc = get_object_store(); //! let path = Path::from("data/large_file"); -//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap(); -//! -//! let bytes = Bytes::from_static(b"hello"); -//! writer.write_all(&bytes).await.unwrap(); -//! writer.flush().await.unwrap(); -//! writer.shutdown().await.unwrap(); +//! let upload = object_store.put_multipart(&path).await.unwrap(); +//! let mut write = WriteMultipart::new(upload); +//! write.write(b"hello"); +//! write.finish().await.unwrap(); //! # } //! ``` //! @@ -501,9 +498,11 @@ pub use tags::TagSet; pub mod multipart; mod parse; +mod upload; mod util; pub use parse::{parse_url, parse_url_opts}; +pub use upload::*; pub use util::GetRange; use crate::path::Path; @@ -520,12 +519,11 @@ use std::fmt::{Debug, Formatter}; use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::sync::Arc; -use tokio::io::AsyncWrite; /// An alias for a dynamically dispatched object store implementation. pub type DynObjectStore = dyn ObjectStore; -/// Id type for multi-part uploads. +/// Id type for multipart uploads. pub type MultipartId = String; /// Universal API to multiple object store services. @@ -543,48 +541,11 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// Save the provided bytes to the specified location with the given options async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result; - /// Get a multi-part upload that allows writing data in chunks. - /// - /// Most cloud-based uploads will buffer and upload parts in parallel. - /// - /// To complete the upload, [AsyncWrite::poll_shutdown] must be called - /// to completion. This operation is guaranteed to be atomic, it will either - /// make all the written data available at `location`, or fail. No clients - /// should be able to observe a partially written object. - /// - /// For some object stores (S3, GCS, and local in particular), if the - /// writer fails or panics, you must call [ObjectStore::abort_multipart] - /// to clean up partially written data. - /// - ///
- /// It is recommended applications wait for any in-flight requests to complete by calling `flush`, if - /// there may be a significant gap in time (> ~30s) before the next write. - /// These gaps can include times where the function returns control to the - /// caller while keeping the writer open. If `flush` is not called, futures - /// for in-flight requests may be left unpolled long enough for the requests - /// to time out, causing the write to fail. - ///
- /// - /// For applications requiring fine-grained control of multipart uploads - /// see [`MultiPartStore`], although note that this interface cannot be - /// supported by all [`ObjectStore`] backends. - /// - /// For applications looking to implement this interface for a custom - /// multipart API, see [`WriteMultiPart`] which handles the complexities - /// of performing parallel uploads of fixed size parts. - /// - /// [`WriteMultiPart`]: multipart::WriteMultiPart - /// [`MultiPartStore`]: multipart::MultiPartStore - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)>; - - /// Cleanup an aborted upload. + /// Perform a multipart upload /// - /// See documentation for individual stores for exact behavior, as capabilities - /// vary by object store. - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()>; + /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads + /// typically require multiple separate requests. See [`MultipartUpload`] for more information + async fn put_multipart(&self, location: &Path) -> Result>; /// Return the bytes that are stored at the specified location. async fn get(&self, location: &Path) -> Result { @@ -769,21 +730,10 @@ macro_rules! as_ref_impl { self.as_ref().put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { + async fn put_multipart(&self, location: &Path) -> Result> { self.as_ref().put_multipart(location).await } - async fn abort_multipart( - &self, - location: &Path, - multipart_id: &MultipartId, - ) -> Result<()> { - self.as_ref().abort_multipart(location, multipart_id).await - } - async fn get(&self, location: &Path) -> Result { self.as_ref().get(location).await } @@ -1246,14 +1196,12 @@ mod test_util { #[cfg(test)] mod tests { use super::*; - use crate::multipart::MultiPartStore; + use crate::multipart::MultipartStore; use crate::test_util::flatten_list_stream; use chrono::TimeZone; use futures::stream::FuturesUnordered; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; - use std::future::Future; - use tokio::io::AsyncWriteExt; pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) { put_get_delete_list_opts(storage).await @@ -1928,12 +1876,11 @@ mod tests { let location = Path::from("test_dir/test_upload_file.txt"); // Can write to storage - let data = get_chunks(5_000, 10); + let data = get_chunks(5 * 1024 * 1024, 3); let bytes_expected = data.concat(); - let (_, mut writer) = storage.put_multipart(&location).await.unwrap(); - for chunk in &data { - writer.write_all(chunk).await.unwrap(); - } + let mut upload = storage.put_multipart(&location).await.unwrap(); + let uploads = data.into_iter().map(|x| upload.put_part(x)); + futures::future::try_join_all(uploads).await.unwrap(); // Object should not yet exist in store let meta_res = storage.head(&location).await; @@ -1949,7 +1896,8 @@ mod tests { let result = storage.list_with_delimiter(None).await.unwrap(); assert_eq!(&result.objects, &[]); - writer.shutdown().await.unwrap(); + upload.complete().await.unwrap(); + let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap(); assert_eq!(bytes_expected, bytes_written); @@ -1957,22 +1905,19 @@ mod tests { // Sizes chosen to ensure we write three parts let data = get_chunks(3_200_000, 7); let bytes_expected = data.concat(); - let (_, mut writer) = storage.put_multipart(&location).await.unwrap(); + let upload = storage.put_multipart(&location).await.unwrap(); + let mut writer = WriteMultipart::new(upload); for chunk in &data { - writer.write_all(chunk).await.unwrap(); + writer.write(chunk) } - writer.shutdown().await.unwrap(); + writer.finish().await.unwrap(); let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap(); assert_eq!(bytes_expected, bytes_written); // We can abort an empty write let location = Path::from("test_dir/test_abort_upload.txt"); - let (upload_id, writer) = storage.put_multipart(&location).await.unwrap(); - drop(writer); - storage - .abort_multipart(&location, &upload_id) - .await - .unwrap(); + let mut upload = storage.put_multipart(&location).await.unwrap(); + upload.abort().await.unwrap(); let get_res = storage.get(&location).await; assert!(get_res.is_err()); assert!(matches!( @@ -1981,17 +1926,13 @@ mod tests { )); // We can abort an in-progress write - let (upload_id, mut writer) = storage.put_multipart(&location).await.unwrap(); - if let Some(chunk) = data.first() { - writer.write_all(chunk).await.unwrap(); - let _ = writer.write(chunk).await.unwrap(); - } - drop(writer); - - storage - .abort_multipart(&location, &upload_id) + let mut upload = storage.put_multipart(&location).await.unwrap(); + upload + .put_part(data.first().unwrap().clone()) .await .unwrap(); + + upload.abort().await.unwrap(); let get_res = storage.get(&location).await; assert!(get_res.is_err()); assert!(matches!( @@ -2186,7 +2127,7 @@ mod tests { storage.delete(&path2).await.unwrap(); } - pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultiPartStore) { + pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultipartStore) { let path = Path::from("test_multipart"); let chunk_size = 5 * 1024 * 1024; @@ -2253,7 +2194,7 @@ mod tests { pub(crate) async fn tagging(storage: &dyn ObjectStore, validate: bool, get_tags: F) where F: Fn(Path) -> Fut + Send + Sync, - Fut: Future> + Send, + Fut: std::future::Future> + Send, { use bytes::Buf; use serde::Deserialize; diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index d1363d9a4d46..e5f6841638e1 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -18,18 +18,16 @@ //! An object store that limits the maximum concurrency of the wrapped implementation use crate::{ - BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, - ObjectStore, Path, PutOptions, PutResult, Result, StreamExt, + BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, Path, PutOptions, PutResult, Result, StreamExt, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; use futures::{FutureExt, Stream}; -use std::io::{Error, IoSlice}; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use tokio::io::AsyncWrite; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; /// Store wrapper that wraps an inner store and limits the maximum number of concurrent @@ -81,18 +79,12 @@ impl ObjectStore for LimitStore { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); - let (id, write) = self.inner.put_multipart(location).await?; - Ok((id, Box::new(PermitWrapper::new(write, permit)))) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - let _permit = self.semaphore.acquire().await.unwrap(); - self.inner.abort_multipart(location, multipart_id).await + async fn put_multipart(&self, location: &Path) -> Result> { + let upload = self.inner.put_multipart(location).await?; + Ok(Box::new(LimitUpload { + semaphore: Arc::clone(&self.semaphore), + upload, + })) } async fn get(&self, location: &Path) -> Result { let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); @@ -221,39 +213,42 @@ impl Stream for PermitWrapper { } } -impl AsyncWrite for PermitWrapper { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.inner).poll_write(cx, buf) - } +/// An [`MultipartUpload`] wrapper that limits the maximum number of concurrent requests +#[derive(Debug)] +pub struct LimitUpload { + upload: Box, + semaphore: Arc, +} - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.inner).poll_flush(cx) +impl LimitUpload { + /// Create a new [`LimitUpload`] limiting `upload` to `max_concurrency` concurrent requests + pub fn new(upload: Box, max_concurrency: usize) -> Self { + Self { + upload, + semaphore: Arc::new(Semaphore::new(max_concurrency)), + } } +} - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.inner).poll_shutdown(cx) +#[async_trait] +impl MultipartUpload for LimitUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let upload = self.upload.put_part(data); + let s = Arc::clone(&self.semaphore); + Box::pin(async move { + let _permit = s.acquire().await.unwrap(); + upload.await + }) } - fn poll_write_vectored( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[IoSlice<'_>], - ) -> Poll> { - Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) + async fn complete(&mut self) -> Result { + let _permit = self.semaphore.acquire().await.unwrap(); + self.upload.complete().await } - fn is_write_vectored(&self) -> bool { - self.inner.is_write_vectored() + async fn abort(&mut self) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.upload.abort().await } } diff --git a/object_store/src/local.rs b/object_store/src/local.rs index d631771778db..a7eb4661f686 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -16,34 +16,32 @@ // under the License. //! An object store implementation for a local filesystem -use crate::{ - maybe_spawn_blocking, - path::{absolute_path_to_url, Path}, - util::InvalidGetRange, - GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, - PutMode, PutOptions, PutResult, Result, -}; -use async_trait::async_trait; -use bytes::Bytes; -use chrono::{DateTime, Utc}; -use futures::future::BoxFuture; -use futures::ready; -use futures::{stream::BoxStream, StreamExt}; -use futures::{FutureExt, TryStreamExt}; -use snafu::{ensure, ResultExt, Snafu}; use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions}; use std::io::{ErrorKind, Read, Seek, SeekFrom, Write}; use std::ops::Range; -use std::pin::Pin; use std::sync::Arc; -use std::task::Poll; use std::time::SystemTime; use std::{collections::BTreeSet, convert::TryFrom, io}; use std::{collections::VecDeque, path::PathBuf}; -use tokio::io::AsyncWrite; + +use async_trait::async_trait; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use futures::{stream::BoxStream, StreamExt}; +use futures::{FutureExt, TryStreamExt}; +use parking_lot::Mutex; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; use url::Url; use walkdir::{DirEntry, WalkDir}; +use crate::{ + maybe_spawn_blocking, + path::{absolute_path_to_url, Path}, + util::InvalidGetRange, + GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMode, PutOptions, PutResult, Result, UploadPart, +}; + /// A specialized `Error` for filesystem object store-related errors #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -155,6 +153,9 @@ pub(crate) enum Error { InvalidPath { path: String, }, + + #[snafu(display("Upload aborted"))] + Aborted, } impl From for super::Error { @@ -342,8 +343,7 @@ impl ObjectStore for LocalFileSystem { let path = self.path_to_filesystem(location)?; maybe_spawn_blocking(move || { - let (mut file, suffix) = new_staged_upload(&path)?; - let staging_path = staged_upload_path(&path, &suffix); + let (mut file, staging_path) = new_staged_upload(&path)?; let mut e_tag = None; let err = match file.write_all(&bytes) { @@ -395,31 +395,10 @@ impl ObjectStore for LocalFileSystem { .await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let dest = self.path_to_filesystem(location)?; - - let (file, suffix) = new_staged_upload(&dest)?; - Ok(( - suffix.clone(), - Box::new(LocalUpload::new(dest, suffix, Arc::new(file))), - )) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { + async fn put_multipart(&self, location: &Path) -> Result> { let dest = self.path_to_filesystem(location)?; - let path: PathBuf = staged_upload_path(&dest, multipart_id); - - maybe_spawn_blocking(move || match std::fs::remove_file(&path) { - Ok(_) => Ok(()), - Err(source) => match source.kind() { - ErrorKind::NotFound => Ok(()), // Already deleted - _ => Err(Error::UnableToDeleteFile { path, source }.into()), - }, - }) - .await + let (file, src) = new_staged_upload(&dest)?; + Ok(Box::new(LocalUpload::new(src, dest, file))) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -677,17 +656,17 @@ fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> { Ok(()) } -/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `suffix` +/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `path` /// /// Creates any directories if necessary -fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> { +fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> { let mut multipart_id = 1; loop { let suffix = multipart_id.to_string(); let path = staged_upload_path(base, &suffix); let mut options = OpenOptions::new(); match options.read(true).write(true).create_new(true).open(&path) { - Ok(f) => return Ok((f, suffix)), + Ok(f) => return Ok((f, path)), Err(source) => match source.kind() { ErrorKind::AlreadyExists => multipart_id += 1, ErrorKind::NotFound => create_parent_dirs(&path, source)?, @@ -705,194 +684,91 @@ fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf { staging_path.into() } -enum LocalUploadState { - /// Upload is ready to send new data - Idle(Arc), - /// In the middle of a write - Writing(Arc, BoxFuture<'static, Result>), - /// In the middle of syncing data and closing file. - /// - /// Future will contain last reference to file, so it will call drop on completion. - ShuttingDown(BoxFuture<'static, Result<(), io::Error>>), - /// File is being moved from it's temporary location to the final location - Committing(BoxFuture<'static, Result<(), io::Error>>), - /// Upload is complete - Complete, +#[derive(Debug)] +struct LocalUpload { + /// The upload state + state: Arc, + /// The location of the temporary file + src: Option, + /// The next offset to write into the file + offset: u64, } -struct LocalUpload { - inner_state: LocalUploadState, +#[derive(Debug)] +struct UploadState { dest: PathBuf, - multipart_id: MultipartId, + file: Mutex>, } impl LocalUpload { - pub fn new(dest: PathBuf, multipart_id: MultipartId, file: Arc) -> Self { + pub fn new(src: PathBuf, dest: PathBuf, file: File) -> Self { Self { - inner_state: LocalUploadState::Idle(file), - dest, - multipart_id, + state: Arc::new(UploadState { + dest, + file: Mutex::new(Some(file)), + }), + src: Some(src), + offset: 0, } } } -impl AsyncWrite for LocalUpload { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - let invalid_state = |condition: &str| -> Poll> { - Poll::Ready(Err(io::Error::new( - ErrorKind::InvalidInput, - format!("Tried to write to file {condition}."), - ))) - }; +#[async_trait] +impl MultipartUpload for LocalUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let offset = self.offset; + self.offset += data.len() as u64; - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - let mut data: Vec = buf.to_vec(); - let data_len = data.len(); - - loop { - match &mut self.inner_state { - LocalUploadState::Idle(file) => { - let file = Arc::clone(file); - let file2 = Arc::clone(&file); - let data: Vec = std::mem::take(&mut data); - self.inner_state = LocalUploadState::Writing( - file, - Box::pin( - runtime - .spawn_blocking(move || (&*file2).write_all(&data)) - .map(move |res| match res { - Err(err) => Err(io::Error::new(ErrorKind::Other, err)), - Ok(res) => res.map(move |_| data_len), - }), - ), - ); - } - LocalUploadState::Writing(file, inner_write) => { - let res = ready!(inner_write.poll_unpin(cx)); - self.inner_state = LocalUploadState::Idle(Arc::clone(file)); - return Poll::Ready(res); - } - LocalUploadState::ShuttingDown(_) => { - return invalid_state("when writer is shutting down"); - } - LocalUploadState::Committing(_) => { - return invalid_state("when writer is committing data"); - } - LocalUploadState::Complete => { - return invalid_state("when writer is complete"); - } - } - } - } else if let LocalUploadState::Idle(file) = &self.inner_state { - let file = Arc::clone(file); - (&*file).write_all(buf)?; - Poll::Ready(Ok(buf.len())) - } else { - // If we are running on this thread, then only possible states are Idle and Complete. - invalid_state("when writer is already complete.") - } + let s = Arc::clone(&self.state); + maybe_spawn_blocking(move || { + let mut f = s.file.lock(); + let file = f.as_mut().context(AbortedSnafu)?; + file.seek(SeekFrom::Start(offset)) + .context(SeekSnafu { path: &s.dest })?; + file.write_all(&data).context(UnableToCopyDataToFileSnafu)?; + Ok(()) + }) + .boxed() } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) + async fn complete(&mut self) -> Result { + let src = self.src.take().context(AbortedSnafu)?; + let s = Arc::clone(&self.state); + maybe_spawn_blocking(move || { + // Ensure no inflight writes + let f = s.file.lock().take().context(AbortedSnafu)?; + std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?; + let metadata = f.metadata().map_err(|e| Error::Metadata { + source: e.into(), + path: src.to_string_lossy().to_string(), + })?; + + Ok(PutResult { + e_tag: Some(get_etag(&metadata)), + version: None, + }) + }) + .await } - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - loop { - match &mut self.inner_state { - LocalUploadState::Idle(file) => { - // We are moving file into the future, and it will be dropped on it's completion, closing the file. - let file = Arc::clone(file); - self.inner_state = LocalUploadState::ShuttingDown(Box::pin( - runtime - .spawn_blocking(move || (*file).sync_all()) - .map(move |res| match res { - Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)), - Ok(res) => res, - }), - )); - } - LocalUploadState::ShuttingDown(fut) => match fut.poll_unpin(cx) { - Poll::Ready(res) => { - res?; - let staging_path = staged_upload_path(&self.dest, &self.multipart_id); - let dest = self.dest.clone(); - self.inner_state = LocalUploadState::Committing(Box::pin( - runtime - .spawn_blocking(move || std::fs::rename(&staging_path, &dest)) - .map(move |res| match res { - Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)), - Ok(res) => res, - }), - )); - } - Poll::Pending => { - return Poll::Pending; - } - }, - LocalUploadState::Writing(_, _) => { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Tried to commit a file where a write is in progress.", - ))); - } - LocalUploadState::Committing(fut) => { - let res = ready!(fut.poll_unpin(cx)); - self.inner_state = LocalUploadState::Complete; - return Poll::Ready(res); - } - LocalUploadState::Complete => { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - "Already complete", - ))) - } - } - } - } else { - let staging_path = staged_upload_path(&self.dest, &self.multipart_id); - match &mut self.inner_state { - LocalUploadState::Idle(file) => { - let file = Arc::clone(file); - self.inner_state = LocalUploadState::Complete; - file.sync_all()?; - drop(file); - std::fs::rename(staging_path, &self.dest)?; - Poll::Ready(Ok(())) - } - _ => { - // If we are running on this thread, then only possible states are Idle and Complete. - Poll::Ready(Err(io::Error::new(ErrorKind::Other, "Already complete"))) - } - } - } + async fn abort(&mut self) -> Result<()> { + let src = self.src.take().context(AbortedSnafu)?; + maybe_spawn_blocking(move || { + std::fs::remove_file(&src).context(UnableToDeleteFileSnafu { path: &src })?; + Ok(()) + }) + .await } } impl Drop for LocalUpload { fn drop(&mut self) { - match self.inner_state { - LocalUploadState::Complete => (), - _ => { - self.inner_state = LocalUploadState::Complete; - let path = staged_upload_path(&self.dest, &self.multipart_id); - // Try to cleanup intermediate file ignoring any error - match tokio::runtime::Handle::try_current() { - Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(path))), - Err(_) => drop(std::fs::remove_file(path)), - }; - } + if let Some(src) = self.src.take() { + // Try to clean up intermediate file ignoring any error + match tokio::runtime::Handle::try_current() { + Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(src))), + Err(_) => drop(std::fs::remove_file(src)), + }; } } } @@ -1095,12 +971,13 @@ fn convert_walkdir_result( #[cfg(test)] mod tests { - use super::*; - use crate::test_util::flatten_list_stream; - use crate::tests::*; use futures::TryStreamExt; use tempfile::{NamedTempFile, TempDir}; - use tokio::io::AsyncWriteExt; + + use crate::test_util::flatten_list_stream; + use crate::tests::*; + + use super::*; #[tokio::test] async fn file_test() { @@ -1125,7 +1002,18 @@ mod tests { put_get_delete_list(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; - stream_get(&integration).await; + + // Can't use stream_get test as WriteMultipart uses a tokio JoinSet + let p = Path::from("manual_upload"); + let mut upload = integration.put_multipart(&p).await.unwrap(); + upload.put_part(Bytes::from_static(b"123")).await.unwrap(); + upload.put_part(Bytes::from_static(b"45678")).await.unwrap(); + let r = upload.complete().await.unwrap(); + + let get = integration.get(&p).await.unwrap(); + assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap()); + let actual = get.bytes().await.unwrap(); + assert_eq!(actual.as_ref(), b"12345678"); }); } @@ -1422,12 +1310,11 @@ mod tests { let location = Path::from("some_file"); let data = Bytes::from("arbitrary data"); - let (multipart_id, mut writer) = integration.put_multipart(&location).await.unwrap(); - writer.write_all(&data).await.unwrap(); + let mut u1 = integration.put_multipart(&location).await.unwrap(); + u1.put_part(data.clone()).await.unwrap(); - let (multipart_id_2, mut writer_2) = integration.put_multipart(&location).await.unwrap(); - assert_ne!(multipart_id, multipart_id_2); - writer_2.write_all(&data).await.unwrap(); + let mut u2 = integration.put_multipart(&location).await.unwrap(); + u2.put_part(data).await.unwrap(); let list = flatten_list_stream(&integration, None).await.unwrap(); assert_eq!(list.len(), 0); @@ -1520,11 +1407,13 @@ mod tests { #[cfg(not(target_arch = "wasm32"))] #[cfg(test)] mod not_wasm_tests { - use crate::local::LocalFileSystem; - use crate::{ObjectStore, Path}; use std::time::Duration; + + use bytes::Bytes; use tempfile::TempDir; - use tokio::io::AsyncWriteExt; + + use crate::local::LocalFileSystem; + use crate::{ObjectStore, Path}; #[tokio::test] async fn test_cleanup_intermediate_files() { @@ -1532,12 +1421,13 @@ mod not_wasm_tests { let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); let location = Path::from("some_file"); - let (_, mut writer) = integration.put_multipart(&location).await.unwrap(); - writer.write_all(b"hello").await.unwrap(); + let data = Bytes::from_static(b"hello"); + let mut upload = integration.put_multipart(&location).await.unwrap(); + upload.put_part(data).await.unwrap(); let file_count = std::fs::read_dir(root.path()).unwrap().count(); assert_eq!(file_count, 1); - drop(writer); + drop(upload); tokio::time::sleep(Duration::from_millis(1)).await; @@ -1549,13 +1439,15 @@ mod not_wasm_tests { #[cfg(target_family = "unix")] #[cfg(test)] mod unix_test { - use crate::local::LocalFileSystem; - use crate::{ObjectStore, Path}; + use std::fs::OpenOptions; + use nix::sys::stat; use nix::unistd; - use std::fs::OpenOptions; use tempfile::TempDir; + use crate::local::LocalFileSystem; + use crate::{ObjectStore, Path}; + #[tokio::test] async fn test_fifo() { let filename = "some_file"; diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 41ee1091a3b2..6c960d4f24fb 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -16,27 +16,24 @@ // under the License. //! An in-memory object store implementation -use crate::multipart::{MultiPartStore, PartId}; -use crate::util::InvalidGetRange; -use crate::{ - path::Path, GetRange, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, - PutMode, PutOptions, PutResult, Result, UpdateVersion, -}; -use crate::{GetOptions, MultipartId}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::ops::Range; +use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt}; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt, Snafu}; -use std::collections::BTreeSet; -use std::collections::{BTreeMap, HashMap}; -use std::io; -use std::ops::Range; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Poll; -use tokio::io::AsyncWrite; + +use crate::multipart::{MultipartStore, PartId}; +use crate::util::InvalidGetRange; +use crate::GetOptions; +use crate::{ + path::Path, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, + ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, UpdateVersion, UploadPart, +}; /// A specialized `Error` for in-memory object store-related errors #[derive(Debug, Snafu)] @@ -213,23 +210,12 @@ impl ObjectStore for InMemory { }) } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - Ok(( - String::new(), - Box::new(InMemoryUpload { - location: location.clone(), - data: Vec::new(), - storage: Arc::clone(&self.storage), - }), - )) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - // Nothing to clean up - Ok(()) + async fn put_multipart(&self, location: &Path) -> Result> { + Ok(Box::new(InMemoryUpload { + location: location.clone(), + parts: vec![], + storage: Arc::clone(&self.storage), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -391,7 +377,7 @@ impl ObjectStore for InMemory { } #[async_trait] -impl MultiPartStore for InMemory { +impl MultipartStore for InMemory { async fn create_multipart(&self, _path: &Path) -> Result { let mut storage = self.storage.write(); let etag = storage.next_etag; @@ -482,45 +468,42 @@ impl InMemory { } } +#[derive(Debug)] struct InMemoryUpload { location: Path, - data: Vec, + parts: Vec, storage: Arc>, } -impl AsyncWrite for InMemoryUpload { - fn poll_write( - mut self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - self.data.extend_from_slice(buf); - Poll::Ready(Ok(buf.len())) +#[async_trait] +impl MultipartUpload for InMemoryUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + self.parts.push(data); + Box::pin(futures::future::ready(Ok(()))) } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) + async fn complete(&mut self) -> Result { + let cap = self.parts.iter().map(|x| x.len()).sum(); + let mut buf = Vec::with_capacity(cap); + self.parts.iter().for_each(|x| buf.extend_from_slice(x)); + let etag = self.storage.write().insert(&self.location, buf.into()); + Ok(PutResult { + e_tag: Some(etag.to_string()), + version: None, + }) } - fn poll_shutdown( - mut self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - let data = Bytes::from(std::mem::take(&mut self.data)); - self.storage.write().insert(&self.location, data); - Poll::Ready(Ok(())) + async fn abort(&mut self) -> Result<()> { + Ok(()) } } #[cfg(test)] mod tests { - use super::*; - use crate::tests::*; + use super::*; + #[tokio::test] async fn in_memory_test() { let integration = InMemory::new(); diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index 1dcd5a6f4960..26cce3936244 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -17,34 +17,16 @@ //! Cloud Multipart Upload //! -//! This crate provides an asynchronous interface for multipart file uploads to cloud storage services. -//! It's designed to offer efficient, non-blocking operations, +//! This crate provides an asynchronous interface for multipart file uploads to +//! cloud storage services. It's designed to offer efficient, non-blocking operations, //! especially useful when dealing with large files or high-throughput systems. use async_trait::async_trait; use bytes::Bytes; -use futures::{stream::FuturesUnordered, Future, StreamExt}; -use std::{io, pin::Pin, sync::Arc, task::Poll}; -use tokio::io::AsyncWrite; use crate::path::Path; use crate::{MultipartId, PutResult, Result}; -type BoxedTryFuture = Pin> + Send>>; - -/// A trait used in combination with [`WriteMultiPart`] to implement -/// [`AsyncWrite`] on top of an API for multipart upload -#[async_trait] -pub trait PutPart: Send + Sync + 'static { - /// Upload a single part - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result; - - /// Complete the upload with the provided parts - /// - /// `completed_parts` is in order of part number - async fn complete(&self, completed_parts: Vec) -> Result<()>; -} - /// Represents a part of a file that has been successfully uploaded in a multipart upload process. #[derive(Debug, Clone)] pub struct PartId { @@ -52,222 +34,6 @@ pub struct PartId { pub content_id: String, } -/// Wrapper around a [`PutPart`] that implements [`AsyncWrite`] -/// -/// Data will be uploaded in fixed size chunks of 10 MiB in parallel, -/// up to the configured maximum concurrency -pub struct WriteMultiPart { - inner: Arc, - /// A list of completed parts, in sequential order. - completed_parts: Vec>, - /// Part upload tasks currently running - tasks: FuturesUnordered>, - /// Maximum number of upload tasks to run concurrently - max_concurrency: usize, - /// Buffer that will be sent in next upload. - current_buffer: Vec, - /// Size of each part. - /// - /// While S3 and Minio support variable part sizes, R2 requires they all be - /// exactly the same size. - part_size: usize, - /// Index of current part - current_part_idx: usize, - /// The completion task - completion_task: Option>, -} - -impl WriteMultiPart { - /// Create a new multipart upload with the implementation and the given maximum concurrency - pub fn new(inner: T, max_concurrency: usize) -> Self { - Self { - inner: Arc::new(inner), - completed_parts: Vec::new(), - tasks: FuturesUnordered::new(), - max_concurrency, - current_buffer: Vec::new(), - // TODO: Should self vary by provider? - // TODO: Should we automatically increase then when part index gets large? - - // Minimum size of 5 MiB - // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - // https://cloud.google.com/storage/quotas#requests - part_size: 10 * 1024 * 1024, - current_part_idx: 0, - completion_task: None, - } - } - - // Add data to the current buffer, returning the number of bytes added - fn add_to_buffer(mut self: Pin<&mut Self>, buf: &[u8], offset: usize) -> usize { - let remaining_capacity = self.part_size - self.current_buffer.len(); - let to_copy = std::cmp::min(remaining_capacity, buf.len() - offset); - self.current_buffer - .extend_from_slice(&buf[offset..offset + to_copy]); - to_copy - } - - /// Poll current tasks - fn poll_tasks( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Result<(), io::Error> { - if self.tasks.is_empty() { - return Ok(()); - } - while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) { - let (part_idx, part) = res?; - let total_parts = self.completed_parts.len(); - self.completed_parts - .resize(std::cmp::max(part_idx + 1, total_parts), None); - self.completed_parts[part_idx] = Some(part); - } - Ok(()) - } - - // The `poll_flush` function will only flush the in-progress tasks. - // The `final_flush` method called during `poll_shutdown` will flush - // the `current_buffer` along with in-progress tasks. - // Please see https://github.com/apache/arrow-rs/issues/3390 for more details. - fn final_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // Poll current tasks - self.as_mut().poll_tasks(cx)?; - - // If current_buffer is not empty, see if it can be submitted - if !self.current_buffer.is_empty() && self.tasks.len() < self.max_concurrency { - let out_buffer: Vec = std::mem::take(&mut self.current_buffer); - let inner = Arc::clone(&self.inner); - let part_idx = self.current_part_idx; - self.tasks.push(Box::pin(async move { - let upload_part = inner.put_part(out_buffer, part_idx).await?; - Ok((part_idx, upload_part)) - })); - } - - self.as_mut().poll_tasks(cx)?; - - // If tasks and current_buffer are empty, return Ready - if self.tasks.is_empty() && self.current_buffer.is_empty() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } -} - -impl AsyncWrite for WriteMultiPart { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - // Poll current tasks - self.as_mut().poll_tasks(cx)?; - - let mut offset = 0; - - loop { - // Fill up current buffer - offset += self.as_mut().add_to_buffer(buf, offset); - - // If we don't have a full buffer or we have too many tasks, break - if self.current_buffer.len() < self.part_size - || self.tasks.len() >= self.max_concurrency - { - break; - } - - let new_buffer = Vec::with_capacity(self.part_size); - let out_buffer = std::mem::replace(&mut self.current_buffer, new_buffer); - let inner = Arc::clone(&self.inner); - let part_idx = self.current_part_idx; - self.tasks.push(Box::pin(async move { - let upload_part = inner.put_part(out_buffer, part_idx).await?; - Ok((part_idx, upload_part)) - })); - self.current_part_idx += 1; - - // We need to poll immediately after adding to setup waker - self.as_mut().poll_tasks(cx)?; - } - - // If offset is zero, then we didn't write anything because we didn't - // have capacity for more tasks and our buffer is full. - if offset == 0 && !buf.is_empty() { - Poll::Pending - } else { - Poll::Ready(Ok(offset)) - } - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // Poll current tasks - self.as_mut().poll_tasks(cx)?; - - // If tasks is empty, return Ready - if self.tasks.is_empty() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // First, poll flush - match self.as_mut().final_flush(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(res) => res?, - }; - - // If shutdown task is not set, set it - let parts = std::mem::take(&mut self.completed_parts); - let parts = parts - .into_iter() - .enumerate() - .map(|(idx, part)| { - part.ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - format!("Missing information for upload part {idx}"), - ) - }) - }) - .collect::>()?; - - let inner = Arc::clone(&self.inner); - let completion_task = self.completion_task.get_or_insert_with(|| { - Box::pin(async move { - inner.complete(parts).await?; - Ok(()) - }) - }); - - Pin::new(completion_task).poll(cx) - } -} - -impl std::fmt::Debug for WriteMultiPart { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("WriteMultiPart") - .field("completed_parts", &self.completed_parts) - .field("tasks", &self.tasks) - .field("max_concurrency", &self.max_concurrency) - .field("current_buffer", &self.current_buffer) - .field("part_size", &self.part_size) - .field("current_part_idx", &self.current_part_idx) - .finish() - } -} - /// A low-level interface for interacting with multipart upload APIs /// /// Most use-cases should prefer [`ObjectStore::put_multipart`] as this is supported by more @@ -277,7 +43,7 @@ impl std::fmt::Debug for WriteMultiPart { /// [`ObjectStore::put_multipart`]: crate::ObjectStore::put_multipart /// [`LocalFileSystem`]: crate::local::LocalFileSystem #[async_trait] -pub trait MultiPartStore: Send + Sync + 'static { +pub trait MultipartStore: Send + Sync + 'static { /// Creates a new multipart upload, returning the [`MultipartId`] async fn create_multipart(&self, path: &Path) -> Result; @@ -288,10 +54,11 @@ pub trait MultiPartStore: Send + Sync + 'static { /// /// Most stores require that all parts excluding the last are at least 5 MiB, and some /// further require that all parts excluding the last be the same size, e.g. [R2]. - /// [`WriteMultiPart`] performs writes in fixed size blocks of 10 MiB, and clients wanting + /// [`WriteMultipart`] performs writes in fixed size blocks of 5 MiB, and clients wanting /// to maximise compatibility should look to do likewise. /// /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations + /// [`WriteMultipart`]: crate::upload::WriteMultipart async fn put_part( &self, path: &Path, diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 38f9b07bbd05..053f71a2d063 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -19,12 +19,11 @@ use bytes::Bytes; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use std::ops::Range; -use tokio::io::AsyncWrite; use crate::path::Path; use crate::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, - Result, + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutOptions, + PutResult, Result, }; #[doc(hidden)] @@ -91,18 +90,11 @@ impl ObjectStore for PrefixStore { self.inner.put_opts(&full_path, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { + async fn put_multipart(&self, location: &Path) -> Result> { let full_path = self.full_path(location); self.inner.put_multipart(&full_path).await } - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - let full_path = self.full_path(location); - self.inner.abort_multipart(&full_path, multipart_id).await - } async fn get(&self, location: &Path) -> Result { let full_path = self.full_path(location); self.inner.get(&full_path).await diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 252256a4599e..5ca1eedbf739 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -20,16 +20,15 @@ use parking_lot::Mutex; use std::ops::Range; use std::{convert::TryInto, sync::Arc}; +use crate::GetOptions; use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, - PutResult, Result, + path::Path, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutOptions, PutResult, Result, }; -use crate::{GetOptions, MultipartId}; use async_trait::async_trait; use bytes::Bytes; use futures::{stream::BoxStream, FutureExt, StreamExt}; use std::time::Duration; -use tokio::io::AsyncWrite; /// Configuration settings for throttled store #[derive(Debug, Default, Clone, Copy)] @@ -158,14 +157,7 @@ impl ObjectStore for ThrottledStore { self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - _location: &Path, - ) -> Result<(MultipartId, Box)> { - Err(super::Error::NotImplemented) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { + async fn put_multipart(&self, _location: &Path) -> Result> { Err(super::Error::NotImplemented) } diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs new file mode 100644 index 000000000000..6f8bfa8a5f73 --- /dev/null +++ b/object_store/src/upload.rs @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PutResult, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use tokio::task::JoinSet; + +/// An upload part request +pub type UploadPart = BoxFuture<'static, Result<()>>; + +/// A trait allowing writing an object in fixed size chunks +/// +/// Consecutive chunks of data can be written by calling [`MultipartUpload::put_part`] and polling +/// the returned futures to completion. Multiple futures returned by [`MultipartUpload::put_part`] +/// may be polled in parallel, allowing for concurrent uploads. +/// +/// Once all part uploads have been polled to completion, the upload can be completed by +/// calling [`MultipartUpload::complete`]. This will make the entire uploaded object visible +/// as an atomic operation.It is implementation behind behaviour if [`MultipartUpload::complete`] +/// is called before all [`UploadPart`] have been polled to completion. +#[async_trait] +pub trait MultipartUpload: Send + std::fmt::Debug { + /// Upload the next part + /// + /// Most stores require that all parts excluding the last are at least 5 MiB, and some + /// further require that all parts excluding the last be the same size, e.g. [R2]. + /// Clients wanting to maximise compatibility should therefore perform writes in + /// fixed size blocks larger than 5 MiB. + /// + /// Implementations may invoke this method multiple times and then await on the + /// returned futures in parallel + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use object_store::MultipartUpload; + /// # + /// # async fn test() { + /// # + /// let mut upload: Box<&dyn MultipartUpload> = todo!(); + /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); + /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into()); + /// futures::future::try_join(p1, p2).await.unwrap(); + /// upload.complete().await.unwrap(); + /// # } + /// ``` + /// + /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations + fn put_part(&mut self, data: Bytes) -> UploadPart; + + /// Complete the multipart upload + /// + /// It is implementation defined behaviour if this method is called before polling + /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally, + /// it is implementation defined behaviour to call [`MultipartUpload::complete`] + /// on an already completed or aborted [`MultipartUpload`]. + async fn complete(&mut self) -> Result; + + /// Abort the multipart upload + /// + /// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`], + /// some object stores will automatically clean up any previously uploaded parts. + /// However, some stores, such as S3 and GCS, cannot perform cleanup on drop. + /// As such [`MultipartUpload::abort`] can be invoked to perform this cleanup. + /// + /// It will not be possible to call `abort` in all failure scenarios, for example + /// non-graceful shutdown of the calling application. It is therefore recommended + /// object stores are configured with lifecycle rules to automatically cleanup + /// unused parts older than some threshold. See [crate::aws] and [crate::gcp] + /// for more information. + /// + /// It is implementation defined behaviour to call [`MultipartUpload::abort`] + /// on an already completed or aborted [`MultipartUpload`] + async fn abort(&mut self) -> Result<()>; +} + +/// A synchronous write API for uploading data in parallel in fixed size chunks +/// +/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in parallel +/// +/// The design also takes inspiration from [`Sink`] with [`WriteMultipart::wait_for_capacity`] +/// allowing back pressure on producers, prior to buffering the next part. However, unlike +/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers +/// +/// [`Sink`]: futures::sink::Sink +#[derive(Debug)] +pub struct WriteMultipart { + upload: Box, + + buffer: Vec, + + tasks: JoinSet>, +} + +impl WriteMultipart { + /// Create a new [`WriteMultipart`] that will upload using 5MB chunks + pub fn new(upload: Box) -> Self { + Self::new_with_capacity(upload, 5 * 1024 * 1024) + } + + /// Create a new [`WriteMultipart`] that will upload in fixed `capacity` sized chunks + pub fn new_with_capacity(upload: Box, capacity: usize) -> Self { + Self { + upload, + buffer: Vec::with_capacity(capacity), + tasks: Default::default(), + } + } + + /// Wait until there are `max_concurrency` or fewer requests in-flight + pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> { + while self.tasks.len() > max_concurrency { + self.tasks.join_next().await.unwrap()??; + } + Ok(()) + } + + /// Write data to this [`WriteMultipart`] + /// + /// Note this method is synchronous (not `async`) and will immediately start new uploads + /// as soon as the internal `capacity` is hit, regardless of + /// how many outstanding uploads are already in progress. + /// + /// Back pressure can optionally be applied to producers by calling + /// [`Self::wait_for_capacity`] prior to calling this method + pub fn write(&mut self, mut buf: &[u8]) { + while !buf.is_empty() { + let capacity = self.buffer.capacity(); + let remaining = capacity - self.buffer.len(); + let to_read = buf.len().min(remaining); + self.buffer.extend_from_slice(&buf[..to_read]); + if to_read == remaining { + let part = std::mem::replace(&mut self.buffer, Vec::with_capacity(capacity)); + self.put_part(part.into()) + } + buf = &buf[to_read..] + } + } + + fn put_part(&mut self, part: Bytes) { + self.tasks.spawn(self.upload.put_part(part)); + } + + /// Abort this upload, attempting to clean up any successfully uploaded parts + pub async fn abort(mut self) -> Result<()> { + self.tasks.shutdown().await; + self.upload.abort().await + } + + /// Flush final chunk, and await completion of all in-flight requests + pub async fn finish(mut self) -> Result { + if !self.buffer.is_empty() { + let part = std::mem::take(&mut self.buffer); + self.put_part(part.into()) + } + + self.wait_for_capacity(0).await?; + self.upload.complete().await + } +} diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index f73d78578f08..309a86d8fe9d 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -25,7 +25,6 @@ use object_store::path::Path; use object_store::*; use std::fmt::Formatter; use tempfile::tempdir; -use tokio::io::AsyncWrite; #[derive(Debug)] struct MyStore(LocalFileSystem); @@ -42,14 +41,7 @@ impl ObjectStore for MyStore { self.0.put_opts(path, data, opts).await } - async fn put_multipart( - &self, - _: &Path, - ) -> Result<(MultipartId, Box)> { - todo!() - } - - async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> { + async fn put_multipart(&self, _location: &Path) -> Result> { todo!() }