diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 712b7a36c56a..423052bfee68 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -19,15 +19,11 @@ //! //! ## Streaming uploads //! -//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those -//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks -//! are uploaded concurrently. +//! [ObjectStore::upload] will upload data in blocks and write a blob from those blocks. //! -//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide -//! a way to drop old blocks. Instead unused blocks are automatically cleaned up -//! after 7 days. +//! Unused blocks will automatically be dropped after 7 days. use crate::{ - multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}, + multipart::{MultiPartStore, PartId}, path::Path, signer::Signer, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index fdefe599f79e..f86a7cb7ff43 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -18,7 +18,7 @@ //! Utilities for performing tokio-style buffered IO use crate::path::Path; -use crate::{MultipartId, ObjectMeta, ObjectStore}; +use crate::{ChunkedUpload, ObjectMeta, ObjectStore}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -213,7 +213,6 @@ impl AsyncBufRead for BufReader { pub struct BufWriter { capacity: usize, state: BufWriterState, - multipart_id: Option, store: Arc, } @@ -221,22 +220,19 @@ impl std::fmt::Debug for BufWriter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BufWriter") .field("capacity", &self.capacity) - .field("multipart_id", &self.multipart_id) .finish() } } -type MultipartResult = (MultipartId, Box); - enum BufWriterState { /// Buffer up to capacity bytes Buffer(Path, Vec), /// [`ObjectStore::put_multipart`] - Prepare(BoxFuture<'static, std::io::Result>), + Prepare(BoxFuture<'static, std::io::Result>), /// Write to a multipart upload - Write(Box), + Write(Option), /// [`ObjectStore::put`] - Put(BoxFuture<'static, std::io::Result<()>>), + Flush(BoxFuture<'static, std::io::Result<()>>), } impl BufWriter { @@ -251,14 +247,8 @@ impl BufWriter { capacity, store, state: BufWriterState::Buffer(path, Vec::new()), - multipart_id: None, } } - - /// Returns the [`MultipartId`] if multipart upload - pub fn multipart_id(&self) -> Option<&MultipartId> { - self.multipart_id.as_ref() - } } impl AsyncWrite for BufWriter { @@ -270,12 +260,15 @@ impl AsyncWrite for BufWriter { let cap = self.capacity; loop { return match &mut self.state { - BufWriterState::Write(write) => Pin::new(write).poll_write(cx, buf), - BufWriterState::Put(_) => panic!("Already shut down"), + BufWriterState::Write(Some(write)) => { + write.write(buf); + Poll::Ready(Ok(buf.len())) + } + BufWriterState::Write(None) | BufWriterState::Flush(_) => { + panic!("Already shut down") + } BufWriterState::Prepare(f) => { - let (id, w) = ready!(f.poll_unpin(cx)?); - self.state = BufWriterState::Write(w); - self.multipart_id = Some(id); + self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into()); continue; } BufWriterState::Buffer(path, b) => { @@ -284,9 +277,10 @@ impl AsyncWrite for BufWriter { let path = std::mem::take(path); let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { - let (id, mut writer) = store.put_multipart(&path).await?; - writer.write_all(&buffer).await?; - Ok((id, writer)) + let upload = store.upload(&path).await?; + let mut chunked = ChunkedUpload::new(upload); + chunked.write(&buffer); + Ok(chunked) })); continue; } @@ -300,13 +294,10 @@ impl AsyncWrite for BufWriter { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { return match &mut self.state { - BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())), - BufWriterState::Write(write) => Pin::new(write).poll_flush(cx), - BufWriterState::Put(_) => panic!("Already shut down"), + BufWriterState::Write(_) | BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())), + BufWriterState::Flush(_) => panic!("Already shut down"), BufWriterState::Prepare(f) => { - let (id, w) = ready!(f.poll_unpin(cx)?); - self.state = BufWriterState::Write(w); - self.multipart_id = Some(id); + self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into()); continue; } }; @@ -317,21 +308,28 @@ impl AsyncWrite for BufWriter { loop { match &mut self.state { BufWriterState::Prepare(f) => { - let (id, w) = ready!(f.poll_unpin(cx)?); - self.state = BufWriterState::Write(w); - self.multipart_id = Some(id); + self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into()); } BufWriterState::Buffer(p, b) => { let buf = std::mem::take(b); let path = std::mem::take(p); let store = Arc::clone(&self.store); - self.state = BufWriterState::Put(Box::pin(async move { + self.state = BufWriterState::Flush(Box::pin(async move { store.put(&path, buf.into()).await?; Ok(()) })); } - BufWriterState::Put(f) => return f.poll_unpin(cx), - BufWriterState::Write(w) => return Pin::new(w).poll_shutdown(cx), + BufWriterState::Flush(f) => return f.poll_unpin(cx), + BufWriterState::Write(x) => { + let upload = x.take().unwrap(); + self.state = BufWriterState::Flush( + async move { + upload.finish().await?; + Ok(()) + } + .boxed(), + ) + } } } } @@ -443,9 +441,7 @@ mod tests { writer.write_all(&[0; 20]).await.unwrap(); writer.flush().await.unwrap(); writer.write_all(&[0; 5]).await.unwrap(); - assert!(writer.multipart_id().is_none()); writer.shutdown().await.unwrap(); - assert!(writer.multipart_id().is_none()); assert_eq!(store.head(&path).await.unwrap().size, 25); // Test multipart @@ -453,9 +449,7 @@ mod tests { writer.write_all(&[0; 20]).await.unwrap(); writer.flush().await.unwrap(); writer.write_all(&[0; 20]).await.unwrap(); - assert!(writer.multipart_id().is_some()); writer.shutdown().await.unwrap(); - assert!(writer.multipart_id().is_some()); assert_eq!(store.head(&path).await.unwrap().size, 40); } diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index d33556f4b12e..8ae3026cb0f2 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -25,14 +25,13 @@ use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use futures::stream::BoxStream; use futures::StreamExt; -use tokio::io::AsyncWrite; use crate::path::Path; +use crate::Result; use crate::{ GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, - PutResult, + PutResult, Upload, }; -use crate::{MultipartId, Result}; /// Wraps a [`ObjectStore`] and makes its get response return chunks /// in a controllable manner. @@ -67,15 +66,8 @@ impl ObjectStore for ChunkedStore { self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - self.inner.put_multipart(location).await - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - self.inner.abort_multipart(location, multipart_id).await + async fn upload(&self, location: &Path) -> Result> { + self.inner.upload(location).await } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 252e9fdcadf5..7728f38954f9 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -40,6 +40,9 @@ pub mod header; #[cfg(any(feature = "aws", feature = "gcp"))] pub mod s3; +#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] +pub mod parts; + use async_trait::async_trait; use std::collections::HashMap; use std::str::FromStr; diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs new file mode 100644 index 000000000000..bea63a1a87a9 --- /dev/null +++ b/object_store/src/client/parts.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use parking_lot::Mutex; +use crate::multipart::PartId; + +/// An interior mutable collection of upload parts and their corresponding part index +#[derive(Debug, Default)] +pub(crate) struct Parts(Mutex>); + +impl Parts { + /// Record the [`PartId`] for a given index + /// + /// Note: calling this method multiple times with the same `part_idx` + /// will result in multiple [`PartId`] in the final output + pub(crate) fn put(&self, part_idx: usize, id: PartId) { + self.0.lock().push((part_idx, id)) + } + + /// Produce the final list of [`PartId`] ordered by `part_idx` + /// + /// `expected` is the number of parts expected in the final result + pub(crate) fn finish(&self, expected: usize) -> crate::Result> { + let mut parts = self.0.lock(); + if parts.len() != expected { + return Err(crate::Error::Generic { + store: "Parts", + source: "Missing part".to_string().into(), + }); + } + parts.sort_unstable_by_key(|(idx, _)| *idx); + Ok(parts.drain(..).map(|(_, v)| v).collect()) + } +} \ No newline at end of file diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 8633abbfb4dc..0feb057d1fec 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -20,15 +20,11 @@ //! ## Multi-part uploads //! //! [Multi-part uploads](https://cloud.google.com/storage/docs/multipart-uploads) -//! can be initiated with the [ObjectStore::put_multipart] method. -//! Data passed to the writer is automatically buffered to meet the minimum size -//! requirements for a part. Multiple parts are uploaded concurrently. -//! -//! If the writer fails for any reason, you may have parts uploaded to GCS but not -//! used that you may be charged for. Use the [ObjectStore::abort_multipart] method -//! to abort the upload and drop those unneeded parts. In addition, you may wish to -//! consider implementing automatic clean up of unused parts that are older than one -//! week. +//! can be initiated with the [ObjectStore::upload] method. If neither [`Upload::complete`] +//! nor [`Upload::abort`] is invoked, you may have parts uploaded to GCS but not used, +//! that you will be charged for. It is recommended you configure a [lifecycle rule] to +//! abort incomplete multipart uploads after a certain period of time to avoid being +//! charged for storing partial uploads //! //! ## Using HTTP/2 //! @@ -36,23 +32,23 @@ //! because it allows much higher throughput in our benchmarks (see //! [#5194](https://github.com/apache/arrow-rs/issues/5194)). HTTP/2 can be //! enabled by setting [crate::ClientConfigKey::Http1Only] to false. +//! +//! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu use std::sync::Arc; use crate::client::CredentialProvider; use crate::{ - multipart::{PartId, PutPart, WriteMultiPart}, - path::Path, - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, - Result, + multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, + ObjectStore, PutOptions, PutResult, Result, Upload, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; use client::GoogleCloudStorageClient; use futures::stream::BoxStream; -use tokio::io::AsyncWrite; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; +use crate::client::parts::Parts; use crate::multipart::MultiPartStore; pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; pub use credential::GcpCredential; @@ -89,27 +85,50 @@ impl GoogleCloudStorage { } } +#[derive(Debug)] struct GCSMultipartUpload { + state: Arc, + part_idx: usize, +} + +#[derive(Debug)] +struct UploadState { client: Arc, path: Path, multipart_id: MultipartId, + parts: Parts, } #[async_trait] -impl PutPart for GCSMultipartUpload { - /// Upload an object part - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { - self.client - .put_part(&self.path, &self.multipart_id, part_idx, buf.into()) +impl Upload for GCSMultipartUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let idx = self.part_idx; + self.part_idx += 1; + let state = Arc::clone(&self.state); + Box::pin(async move { + let part = state + .client + .put_part(&state.path, &state.multipart_id, idx, data) + .await?; + state.parts.put(idx, part); + Ok(()) + }) + } + + async fn complete(&mut self) -> Result { + let parts = self.state.parts.finish(self.part_idx)?; + + self.state + .client + .multipart_complete(&self.state.path, &self.state.multipart_id, parts) .await } - /// Complete a multipart upload - async fn complete(&self, completed_parts: Vec) -> Result<()> { - self.client - .multipart_complete(&self.path, &self.multipart_id, completed_parts) - .await?; - Ok(()) + async fn abort(&mut self) -> Result<()> { + self.state + .client + .multipart_cleanup(&self.state.path, &self.state.multipart_id) + .await } } @@ -119,27 +138,18 @@ impl ObjectStore for GoogleCloudStorage { self.client.put(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { + async fn upload(&self, location: &Path) -> Result> { let upload_id = self.client.multipart_initiate(location).await?; - let inner = GCSMultipartUpload { - client: Arc::clone(&self.client), - path: location.clone(), - multipart_id: upload_id.clone(), - }; - - Ok((upload_id, Box::new(WriteMultiPart::new(inner, 8)))) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - self.client - .multipart_cleanup(location, multipart_id) - .await?; - - Ok(()) + Ok(Box::new(GCSMultipartUpload { + part_idx: 0, + state: Arc::new(UploadState { + client: Arc::clone(&self.client), + path: location.clone(), + multipart_id: upload_id.clone(), + parts: Default::default(), + }), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index f1d11db4762c..ba549abed783 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -46,7 +46,7 @@ use crate::http::client::Client; use crate::path::Path; use crate::{ ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, + ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, Upload, }; mod client; @@ -115,15 +115,8 @@ impl ObjectStore for HttpStore { }) } - async fn put_multipart( - &self, - _location: &Path, - ) -> Result<(MultipartId, Box)> { - Err(super::Error::NotImplemented) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - Err(super::Error::NotImplemented) + async fn upload(&self, _location: &Path) -> Result> { + Err(crate::Error::NotImplemented) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 8132002b6e01..d302ce54e77d 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -496,9 +496,11 @@ pub use tags::TagSet; pub mod multipart; mod parse; +mod upload; mod util; pub use parse::{parse_url, parse_url_opts}; +pub use upload::*; pub use util::GetRange; use crate::path::Path; @@ -515,7 +517,6 @@ use std::fmt::{Debug, Formatter}; use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::sync::Arc; -use tokio::io::AsyncWrite; /// An alias for a dynamically dispatched object store implementation. pub type DynObjectStore = dyn ObjectStore; @@ -538,48 +539,11 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// Save the provided bytes to the specified location with the given options async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result; - /// Get a multi-part upload that allows writing data in chunks. + /// Perform a multipart upload /// - /// Most cloud-based uploads will buffer and upload parts in parallel. - /// - /// To complete the upload, [AsyncWrite::poll_shutdown] must be called - /// to completion. This operation is guaranteed to be atomic, it will either - /// make all the written data available at `location`, or fail. No clients - /// should be able to observe a partially written object. - /// - /// For some object stores (S3, GCS, and local in particular), if the - /// writer fails or panics, you must call [ObjectStore::abort_multipart] - /// to clean up partially written data. - /// - ///
- /// It is recommended applications wait for any in-flight requests to complete by calling `flush`, if - /// there may be a significant gap in time (> ~30s) before the next write. - /// These gaps can include times where the function returns control to the - /// caller while keeping the writer open. If `flush` is not called, futures - /// for in-flight requests may be left unpolled long enough for the requests - /// to time out, causing the write to fail. - ///
- /// - /// For applications requiring fine-grained control of multipart uploads - /// see [`MultiPartStore`], although note that this interface cannot be - /// supported by all [`ObjectStore`] backends. - /// - /// For applications looking to implement this interface for a custom - /// multipart API, see [`WriteMultiPart`] which handles the complexities - /// of performing parallel uploads of fixed size parts. - /// - /// [`WriteMultiPart`]: multipart::WriteMultiPart - /// [`MultiPartStore`]: multipart::MultiPartStore - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)>; - - /// Cleanup an aborted upload. - /// - /// See documentation for individual stores for exact behavior, as capabilities - /// vary by object store. - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()>; + /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads + /// typically require multiple separate requests. See [`Upload`] for more information + async fn upload(&self, location: &Path) -> Result>; /// Return the bytes that are stored at the specified location. async fn get(&self, location: &Path) -> Result { @@ -764,19 +728,8 @@ macro_rules! as_ref_impl { self.as_ref().put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - self.as_ref().put_multipart(location).await - } - - async fn abort_multipart( - &self, - location: &Path, - multipart_id: &MultipartId, - ) -> Result<()> { - self.as_ref().abort_multipart(location, multipart_id).await + async fn upload(&self, location: &Path) -> Result> { + self.as_ref().upload(location).await } async fn get(&self, location: &Path) -> Result { @@ -1247,8 +1200,6 @@ mod tests { use futures::stream::FuturesUnordered; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; - use std::future::Future; - use tokio::io::AsyncWriteExt; pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) { put_get_delete_list_opts(storage).await @@ -1923,12 +1874,11 @@ mod tests { let location = Path::from("test_dir/test_upload_file.txt"); // Can write to storage - let data = get_chunks(5_000, 10); + let data = get_chunks(5 * 1024 * 1024, 3); let bytes_expected = data.concat(); - let (_, mut writer) = storage.put_multipart(&location).await.unwrap(); - for chunk in &data { - writer.write_all(chunk).await.unwrap(); - } + let mut upload = storage.upload(&location).await.unwrap(); + let uploads = data.into_iter().map(|x| upload.put_part(x)); + futures::future::try_join_all(uploads).await.unwrap(); // Object should not yet exist in store let meta_res = storage.head(&location).await; @@ -1944,7 +1894,8 @@ mod tests { let result = storage.list_with_delimiter(None).await.unwrap(); assert_eq!(&result.objects, &[]); - writer.shutdown().await.unwrap(); + upload.complete().await.unwrap(); + let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap(); assert_eq!(bytes_expected, bytes_written); @@ -1952,22 +1903,19 @@ mod tests { // Sizes chosen to ensure we write three parts let data = get_chunks(3_200_000, 7); let bytes_expected = data.concat(); - let (_, mut writer) = storage.put_multipart(&location).await.unwrap(); + let upload = storage.upload(&location).await.unwrap(); + let mut writer = ChunkedUpload::new(upload); for chunk in &data { - writer.write_all(chunk).await.unwrap(); + writer.write(chunk) } - writer.shutdown().await.unwrap(); + writer.finish().await.unwrap(); let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap(); assert_eq!(bytes_expected, bytes_written); // We can abort an empty write let location = Path::from("test_dir/test_abort_upload.txt"); - let (upload_id, writer) = storage.put_multipart(&location).await.unwrap(); - drop(writer); - storage - .abort_multipart(&location, &upload_id) - .await - .unwrap(); + let mut upload = storage.upload(&location).await.unwrap(); + upload.abort().await.unwrap(); let get_res = storage.get(&location).await; assert!(get_res.is_err()); assert!(matches!( @@ -1976,17 +1924,13 @@ mod tests { )); // We can abort an in-progress write - let (upload_id, mut writer) = storage.put_multipart(&location).await.unwrap(); - if let Some(chunk) = data.first() { - writer.write_all(chunk).await.unwrap(); - let _ = writer.write(chunk).await.unwrap(); - } - drop(writer); - - storage - .abort_multipart(&location, &upload_id) + let mut upload = storage.upload(&location).await.unwrap(); + upload + .put_part(data.first().unwrap().clone()) .await .unwrap(); + + upload.abort().await.unwrap(); let get_res = storage.get(&location).await; assert!(get_res.is_err()); assert!(matches!( diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index d1363d9a4d46..b20a95cadd6f 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -18,8 +18,8 @@ //! An object store that limits the maximum concurrency of the wrapped implementation use crate::{ - BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, - ObjectStore, Path, PutOptions, PutResult, Result, StreamExt, + BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, Path, + PutOptions, PutResult, Result, StreamExt, Upload, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -81,18 +81,12 @@ impl ObjectStore for LimitStore { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); - let (id, write) = self.inner.put_multipart(location).await?; - Ok((id, Box::new(PermitWrapper::new(write, permit)))) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - let _permit = self.semaphore.acquire().await.unwrap(); - self.inner.abort_multipart(location, multipart_id).await + async fn upload(&self, location: &Path) -> Result> { + let upload = self.inner.upload(location).await?; + Ok(Box::new(LimitUpload { + semaphore: Arc::clone(&self.semaphore), + upload, + })) } async fn get(&self, location: &Path) -> Result { let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); @@ -257,6 +251,43 @@ impl AsyncWrite for PermitWrapper { } } +#[derive(Debug)] +pub struct LimitUpload { + upload: Box, + semaphore: Arc, +} + +impl LimitUpload { + pub fn new(upload: Box, max_concurrency: usize) -> Self { + Self { + upload, + semaphore: Arc::new(Semaphore::new(max_concurrency)), + } + } +} + +#[async_trait] +impl Upload for LimitUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let upload = self.upload.put_part(data); + let s = Arc::clone(&self.semaphore); + Box::pin(async move { + let _permit = s.acquire().await.unwrap(); + upload.await + }) + } + + async fn complete(&mut self) -> Result { + let _permit = self.semaphore.acquire().await.unwrap(); + self.upload.complete().await + } + + async fn abort(&mut self) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.upload.abort().await + } +} + #[cfg(test)] mod tests { use crate::limit::LimitStore; diff --git a/object_store/src/local.rs b/object_store/src/local.rs index d631771778db..76394d72ecae 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -16,34 +16,32 @@ // under the License. //! An object store implementation for a local filesystem -use crate::{ - maybe_spawn_blocking, - path::{absolute_path_to_url, Path}, - util::InvalidGetRange, - GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, - PutMode, PutOptions, PutResult, Result, -}; -use async_trait::async_trait; -use bytes::Bytes; -use chrono::{DateTime, Utc}; -use futures::future::BoxFuture; -use futures::ready; -use futures::{stream::BoxStream, StreamExt}; -use futures::{FutureExt, TryStreamExt}; -use snafu::{ensure, ResultExt, Snafu}; use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions}; use std::io::{ErrorKind, Read, Seek, SeekFrom, Write}; use std::ops::Range; -use std::pin::Pin; use std::sync::Arc; -use std::task::Poll; use std::time::SystemTime; use std::{collections::BTreeSet, convert::TryFrom, io}; use std::{collections::VecDeque, path::PathBuf}; -use tokio::io::AsyncWrite; + +use async_trait::async_trait; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use futures::{stream::BoxStream, StreamExt}; +use futures::{FutureExt, TryStreamExt}; +use parking_lot::Mutex; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; use url::Url; use walkdir::{DirEntry, WalkDir}; +use crate::{ + maybe_spawn_blocking, + path::{absolute_path_to_url, Path}, + util::InvalidGetRange, + GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutMode, + PutOptions, PutResult, Result, Upload, UploadPart, +}; + /// A specialized `Error` for filesystem object store-related errors #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -155,6 +153,9 @@ pub(crate) enum Error { InvalidPath { path: String, }, + + #[snafu(display("Upload aborted"))] + Aborted, } impl From for super::Error { @@ -342,8 +343,7 @@ impl ObjectStore for LocalFileSystem { let path = self.path_to_filesystem(location)?; maybe_spawn_blocking(move || { - let (mut file, suffix) = new_staged_upload(&path)?; - let staging_path = staged_upload_path(&path, &suffix); + let (mut file, staging_path) = new_staged_upload(&path)?; let mut e_tag = None; let err = match file.write_all(&bytes) { @@ -395,31 +395,10 @@ impl ObjectStore for LocalFileSystem { .await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let dest = self.path_to_filesystem(location)?; - - let (file, suffix) = new_staged_upload(&dest)?; - Ok(( - suffix.clone(), - Box::new(LocalUpload::new(dest, suffix, Arc::new(file))), - )) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { + async fn upload(&self, location: &Path) -> Result> { let dest = self.path_to_filesystem(location)?; - let path: PathBuf = staged_upload_path(&dest, multipart_id); - - maybe_spawn_blocking(move || match std::fs::remove_file(&path) { - Ok(_) => Ok(()), - Err(source) => match source.kind() { - ErrorKind::NotFound => Ok(()), // Already deleted - _ => Err(Error::UnableToDeleteFile { path, source }.into()), - }, - }) - .await + let (file, src) = new_staged_upload(&dest)?; + Ok(Box::new(LocalUpload::new(src, dest, file))) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -677,17 +656,17 @@ fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> { Ok(()) } -/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `suffix` +/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `path` /// /// Creates any directories if necessary -fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> { +fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> { let mut multipart_id = 1; loop { let suffix = multipart_id.to_string(); let path = staged_upload_path(base, &suffix); let mut options = OpenOptions::new(); match options.read(true).write(true).create_new(true).open(&path) { - Ok(f) => return Ok((f, suffix)), + Ok(f) => return Ok((f, path)), Err(source) => match source.kind() { ErrorKind::AlreadyExists => multipart_id += 1, ErrorKind::NotFound => create_parent_dirs(&path, source)?, @@ -705,194 +684,91 @@ fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf { staging_path.into() } -enum LocalUploadState { - /// Upload is ready to send new data - Idle(Arc), - /// In the middle of a write - Writing(Arc, BoxFuture<'static, Result>), - /// In the middle of syncing data and closing file. - /// - /// Future will contain last reference to file, so it will call drop on completion. - ShuttingDown(BoxFuture<'static, Result<(), io::Error>>), - /// File is being moved from it's temporary location to the final location - Committing(BoxFuture<'static, Result<(), io::Error>>), - /// Upload is complete - Complete, +#[derive(Debug)] +struct LocalUpload { + /// The upload state + state: Arc, + /// The location of the temporary file + src: Option, + /// The next offset to write into the file + offset: u64, } -struct LocalUpload { - inner_state: LocalUploadState, +#[derive(Debug)] +struct UploadState { dest: PathBuf, - multipart_id: MultipartId, + file: Mutex>, } impl LocalUpload { - pub fn new(dest: PathBuf, multipart_id: MultipartId, file: Arc) -> Self { + pub fn new(src: PathBuf, dest: PathBuf, file: File) -> Self { Self { - inner_state: LocalUploadState::Idle(file), - dest, - multipart_id, + state: Arc::new(UploadState { + dest, + file: Mutex::new(Some(file)), + }), + src: Some(src), + offset: 0, } } } -impl AsyncWrite for LocalUpload { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - let invalid_state = |condition: &str| -> Poll> { - Poll::Ready(Err(io::Error::new( - ErrorKind::InvalidInput, - format!("Tried to write to file {condition}."), - ))) - }; +#[async_trait] +impl Upload for LocalUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + let offset = self.offset; + self.offset += data.len() as u64; - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - let mut data: Vec = buf.to_vec(); - let data_len = data.len(); - - loop { - match &mut self.inner_state { - LocalUploadState::Idle(file) => { - let file = Arc::clone(file); - let file2 = Arc::clone(&file); - let data: Vec = std::mem::take(&mut data); - self.inner_state = LocalUploadState::Writing( - file, - Box::pin( - runtime - .spawn_blocking(move || (&*file2).write_all(&data)) - .map(move |res| match res { - Err(err) => Err(io::Error::new(ErrorKind::Other, err)), - Ok(res) => res.map(move |_| data_len), - }), - ), - ); - } - LocalUploadState::Writing(file, inner_write) => { - let res = ready!(inner_write.poll_unpin(cx)); - self.inner_state = LocalUploadState::Idle(Arc::clone(file)); - return Poll::Ready(res); - } - LocalUploadState::ShuttingDown(_) => { - return invalid_state("when writer is shutting down"); - } - LocalUploadState::Committing(_) => { - return invalid_state("when writer is committing data"); - } - LocalUploadState::Complete => { - return invalid_state("when writer is complete"); - } - } - } - } else if let LocalUploadState::Idle(file) = &self.inner_state { - let file = Arc::clone(file); - (&*file).write_all(buf)?; - Poll::Ready(Ok(buf.len())) - } else { - // If we are running on this thread, then only possible states are Idle and Complete. - invalid_state("when writer is already complete.") - } + let s = Arc::clone(&self.state); + maybe_spawn_blocking(move || { + let mut f = s.file.lock(); + let file = f.as_mut().context(AbortedSnafu)?; + file.seek(SeekFrom::Start(offset)) + .context(SeekSnafu { path: &s.dest })?; + file.write_all(&data).context(UnableToCopyDataToFileSnafu)?; + Ok(()) + }) + .boxed() } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) + async fn complete(&mut self) -> Result { + let src = self.src.take().context(AbortedSnafu)?; + let s = Arc::clone(&self.state); + maybe_spawn_blocking(move || { + // Ensure no inflight writes + let f = s.file.lock().take().context(AbortedSnafu)?; + std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?; + let metadata = f.metadata().map_err(|e| Error::Metadata { + source: e.into(), + path: src.to_string_lossy().to_string(), + })?; + + Ok(PutResult { + e_tag: Some(get_etag(&metadata)), + version: None, + }) + }) + .await } - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - loop { - match &mut self.inner_state { - LocalUploadState::Idle(file) => { - // We are moving file into the future, and it will be dropped on it's completion, closing the file. - let file = Arc::clone(file); - self.inner_state = LocalUploadState::ShuttingDown(Box::pin( - runtime - .spawn_blocking(move || (*file).sync_all()) - .map(move |res| match res { - Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)), - Ok(res) => res, - }), - )); - } - LocalUploadState::ShuttingDown(fut) => match fut.poll_unpin(cx) { - Poll::Ready(res) => { - res?; - let staging_path = staged_upload_path(&self.dest, &self.multipart_id); - let dest = self.dest.clone(); - self.inner_state = LocalUploadState::Committing(Box::pin( - runtime - .spawn_blocking(move || std::fs::rename(&staging_path, &dest)) - .map(move |res| match res { - Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)), - Ok(res) => res, - }), - )); - } - Poll::Pending => { - return Poll::Pending; - } - }, - LocalUploadState::Writing(_, _) => { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Tried to commit a file where a write is in progress.", - ))); - } - LocalUploadState::Committing(fut) => { - let res = ready!(fut.poll_unpin(cx)); - self.inner_state = LocalUploadState::Complete; - return Poll::Ready(res); - } - LocalUploadState::Complete => { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - "Already complete", - ))) - } - } - } - } else { - let staging_path = staged_upload_path(&self.dest, &self.multipart_id); - match &mut self.inner_state { - LocalUploadState::Idle(file) => { - let file = Arc::clone(file); - self.inner_state = LocalUploadState::Complete; - file.sync_all()?; - drop(file); - std::fs::rename(staging_path, &self.dest)?; - Poll::Ready(Ok(())) - } - _ => { - // If we are running on this thread, then only possible states are Idle and Complete. - Poll::Ready(Err(io::Error::new(ErrorKind::Other, "Already complete"))) - } - } - } + async fn abort(&mut self) -> Result<()> { + let src = self.src.take().context(AbortedSnafu)?; + maybe_spawn_blocking(move || { + std::fs::remove_file(&src).context(UnableToDeleteFileSnafu { path: &src })?; + Ok(()) + }) + .await } } impl Drop for LocalUpload { fn drop(&mut self) { - match self.inner_state { - LocalUploadState::Complete => (), - _ => { - self.inner_state = LocalUploadState::Complete; - let path = staged_upload_path(&self.dest, &self.multipart_id); - // Try to cleanup intermediate file ignoring any error - match tokio::runtime::Handle::try_current() { - Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(path))), - Err(_) => drop(std::fs::remove_file(path)), - }; - } + if let Some(src) = self.src.take() { + // Try to clean up intermediate file ignoring any error + match tokio::runtime::Handle::try_current() { + Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(src))), + Err(_) => drop(std::fs::remove_file(src)), + }; } } } @@ -1095,12 +971,13 @@ fn convert_walkdir_result( #[cfg(test)] mod tests { - use super::*; - use crate::test_util::flatten_list_stream; - use crate::tests::*; use futures::TryStreamExt; use tempfile::{NamedTempFile, TempDir}; - use tokio::io::AsyncWriteExt; + + use crate::test_util::flatten_list_stream; + use crate::tests::*; + + use super::*; #[tokio::test] async fn file_test() { @@ -1125,7 +1002,18 @@ mod tests { put_get_delete_list(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; - stream_get(&integration).await; + + // Can't use stream_get test as ChunkedUpload uses a tokio JoinSet + let p = Path::from("manual_upload"); + let mut upload = integration.upload(&p).await.unwrap(); + upload.put_part(Bytes::from_static(b"123")).await.unwrap(); + upload.put_part(Bytes::from_static(b"45678")).await.unwrap(); + let r = upload.complete().await.unwrap(); + + let get = integration.get(&p).await.unwrap(); + assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap()); + let actual = get.bytes().await.unwrap(); + assert_eq!(actual.as_ref(), b"12345678"); }); } @@ -1422,12 +1310,11 @@ mod tests { let location = Path::from("some_file"); let data = Bytes::from("arbitrary data"); - let (multipart_id, mut writer) = integration.put_multipart(&location).await.unwrap(); - writer.write_all(&data).await.unwrap(); + let mut u1 = integration.upload(&location).await.unwrap(); + u1.put_part(data.clone()).await.unwrap(); - let (multipart_id_2, mut writer_2) = integration.put_multipart(&location).await.unwrap(); - assert_ne!(multipart_id, multipart_id_2); - writer_2.write_all(&data).await.unwrap(); + let mut u2 = integration.upload(&location).await.unwrap(); + u2.put_part(data).await.unwrap(); let list = flatten_list_stream(&integration, None).await.unwrap(); assert_eq!(list.len(), 0); @@ -1520,11 +1407,13 @@ mod tests { #[cfg(not(target_arch = "wasm32"))] #[cfg(test)] mod not_wasm_tests { - use crate::local::LocalFileSystem; - use crate::{ObjectStore, Path}; use std::time::Duration; + + use bytes::Bytes; use tempfile::TempDir; - use tokio::io::AsyncWriteExt; + + use crate::local::LocalFileSystem; + use crate::{ObjectStore, Path}; #[tokio::test] async fn test_cleanup_intermediate_files() { @@ -1532,12 +1421,13 @@ mod not_wasm_tests { let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); let location = Path::from("some_file"); - let (_, mut writer) = integration.put_multipart(&location).await.unwrap(); - writer.write_all(b"hello").await.unwrap(); + let data = Bytes::from_static(b"hello"); + let mut upload = integration.upload(&location).await.unwrap(); + upload.put_part(data).await.unwrap(); let file_count = std::fs::read_dir(root.path()).unwrap().count(); assert_eq!(file_count, 1); - drop(writer); + drop(upload); tokio::time::sleep(Duration::from_millis(1)).await; @@ -1549,13 +1439,15 @@ mod not_wasm_tests { #[cfg(target_family = "unix")] #[cfg(test)] mod unix_test { - use crate::local::LocalFileSystem; - use crate::{ObjectStore, Path}; + use std::fs::OpenOptions; + use nix::sys::stat; use nix::unistd; - use std::fs::OpenOptions; use tempfile::TempDir; + use crate::local::LocalFileSystem; + use crate::{ObjectStore, Path}; + #[tokio::test] async fn test_fifo() { let filename = "some_file"; diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 41ee1091a3b2..9e7fc2701b7c 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -16,27 +16,21 @@ // under the License. //! An in-memory object store implementation -use crate::multipart::{MultiPartStore, PartId}; -use crate::util::InvalidGetRange; -use crate::{ - path::Path, GetRange, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, - PutMode, PutOptions, PutResult, Result, UpdateVersion, -}; -use crate::{GetOptions, MultipartId}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::ops::Range; +use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt}; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt, Snafu}; -use std::collections::BTreeSet; -use std::collections::{BTreeMap, HashMap}; -use std::io; -use std::ops::Range; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Poll; -use tokio::io::AsyncWrite; + +use crate::{GetRange, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, path::Path, PutMode, PutOptions, PutResult, Result, UpdateVersion, Upload, UploadPart}; +use crate::GetOptions; +use crate::multipart::{MultiPartStore, PartId}; +use crate::util::InvalidGetRange; /// A specialized `Error` for in-memory object store-related errors #[derive(Debug, Snafu)] @@ -213,23 +207,12 @@ impl ObjectStore for InMemory { }) } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - Ok(( - String::new(), - Box::new(InMemoryUpload { - location: location.clone(), - data: Vec::new(), - storage: Arc::clone(&self.storage), - }), - )) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - // Nothing to clean up - Ok(()) + async fn upload(&self, location: &Path) -> Result> { + Ok(Box::new(InMemoryUpload { + location: location.clone(), + parts: vec![], + storage: Arc::clone(&self.storage), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -482,45 +465,42 @@ impl InMemory { } } +#[derive(Debug)] struct InMemoryUpload { location: Path, - data: Vec, + parts: Vec, storage: Arc>, } -impl AsyncWrite for InMemoryUpload { - fn poll_write( - mut self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - self.data.extend_from_slice(buf); - Poll::Ready(Ok(buf.len())) +#[async_trait] +impl Upload for InMemoryUpload { + fn put_part(&mut self, data: Bytes) -> UploadPart { + self.parts.push(data); + Box::pin(futures::future::ready(Ok(()))) } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) + async fn complete(&mut self) -> Result { + let cap = self.parts.iter().map(|x| x.len()).sum(); + let mut buf = Vec::with_capacity(cap); + self.parts.iter().for_each(|x| buf.extend_from_slice(x)); + let etag = self.storage.write().insert(&self.location, buf.into()); + Ok(PutResult { + e_tag: Some(etag.to_string()), + version: None, + }) } - fn poll_shutdown( - mut self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - let data = Bytes::from(std::mem::take(&mut self.data)); - self.storage.write().insert(&self.location, data); - Poll::Ready(Ok(())) + async fn abort(&mut self) -> Result<()> { + Ok(()) } } #[cfg(test)] mod tests { - use super::*; - use crate::tests::*; + use super::*; + #[tokio::test] async fn in_memory_test() { let integration = InMemory::new(); diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index 1dcd5a6f4960..749fdeb9c022 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -17,33 +17,15 @@ //! Cloud Multipart Upload //! -//! This crate provides an asynchronous interface for multipart file uploads to cloud storage services. -//! It's designed to offer efficient, non-blocking operations, +//! This crate provides an asynchronous interface for multipart file uploads to +//! cloud storage services. It's designed to offer efficient, non-blocking operations, //! especially useful when dealing with large files or high-throughput systems. use async_trait::async_trait; use bytes::Bytes; -use futures::{stream::FuturesUnordered, Future, StreamExt}; -use std::{io, pin::Pin, sync::Arc, task::Poll}; -use tokio::io::AsyncWrite; -use crate::path::Path; use crate::{MultipartId, PutResult, Result}; - -type BoxedTryFuture = Pin> + Send>>; - -/// A trait used in combination with [`WriteMultiPart`] to implement -/// [`AsyncWrite`] on top of an API for multipart upload -#[async_trait] -pub trait PutPart: Send + Sync + 'static { - /// Upload a single part - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result; - - /// Complete the upload with the provided parts - /// - /// `completed_parts` is in order of part number - async fn complete(&self, completed_parts: Vec) -> Result<()>; -} +use crate::path::Path; /// Represents a part of a file that has been successfully uploaded in a multipart upload process. #[derive(Debug, Clone)] @@ -52,222 +34,6 @@ pub struct PartId { pub content_id: String, } -/// Wrapper around a [`PutPart`] that implements [`AsyncWrite`] -/// -/// Data will be uploaded in fixed size chunks of 10 MiB in parallel, -/// up to the configured maximum concurrency -pub struct WriteMultiPart { - inner: Arc, - /// A list of completed parts, in sequential order. - completed_parts: Vec>, - /// Part upload tasks currently running - tasks: FuturesUnordered>, - /// Maximum number of upload tasks to run concurrently - max_concurrency: usize, - /// Buffer that will be sent in next upload. - current_buffer: Vec, - /// Size of each part. - /// - /// While S3 and Minio support variable part sizes, R2 requires they all be - /// exactly the same size. - part_size: usize, - /// Index of current part - current_part_idx: usize, - /// The completion task - completion_task: Option>, -} - -impl WriteMultiPart { - /// Create a new multipart upload with the implementation and the given maximum concurrency - pub fn new(inner: T, max_concurrency: usize) -> Self { - Self { - inner: Arc::new(inner), - completed_parts: Vec::new(), - tasks: FuturesUnordered::new(), - max_concurrency, - current_buffer: Vec::new(), - // TODO: Should self vary by provider? - // TODO: Should we automatically increase then when part index gets large? - - // Minimum size of 5 MiB - // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - // https://cloud.google.com/storage/quotas#requests - part_size: 10 * 1024 * 1024, - current_part_idx: 0, - completion_task: None, - } - } - - // Add data to the current buffer, returning the number of bytes added - fn add_to_buffer(mut self: Pin<&mut Self>, buf: &[u8], offset: usize) -> usize { - let remaining_capacity = self.part_size - self.current_buffer.len(); - let to_copy = std::cmp::min(remaining_capacity, buf.len() - offset); - self.current_buffer - .extend_from_slice(&buf[offset..offset + to_copy]); - to_copy - } - - /// Poll current tasks - fn poll_tasks( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Result<(), io::Error> { - if self.tasks.is_empty() { - return Ok(()); - } - while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) { - let (part_idx, part) = res?; - let total_parts = self.completed_parts.len(); - self.completed_parts - .resize(std::cmp::max(part_idx + 1, total_parts), None); - self.completed_parts[part_idx] = Some(part); - } - Ok(()) - } - - // The `poll_flush` function will only flush the in-progress tasks. - // The `final_flush` method called during `poll_shutdown` will flush - // the `current_buffer` along with in-progress tasks. - // Please see https://github.com/apache/arrow-rs/issues/3390 for more details. - fn final_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // Poll current tasks - self.as_mut().poll_tasks(cx)?; - - // If current_buffer is not empty, see if it can be submitted - if !self.current_buffer.is_empty() && self.tasks.len() < self.max_concurrency { - let out_buffer: Vec = std::mem::take(&mut self.current_buffer); - let inner = Arc::clone(&self.inner); - let part_idx = self.current_part_idx; - self.tasks.push(Box::pin(async move { - let upload_part = inner.put_part(out_buffer, part_idx).await?; - Ok((part_idx, upload_part)) - })); - } - - self.as_mut().poll_tasks(cx)?; - - // If tasks and current_buffer are empty, return Ready - if self.tasks.is_empty() && self.current_buffer.is_empty() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } -} - -impl AsyncWrite for WriteMultiPart { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - // Poll current tasks - self.as_mut().poll_tasks(cx)?; - - let mut offset = 0; - - loop { - // Fill up current buffer - offset += self.as_mut().add_to_buffer(buf, offset); - - // If we don't have a full buffer or we have too many tasks, break - if self.current_buffer.len() < self.part_size - || self.tasks.len() >= self.max_concurrency - { - break; - } - - let new_buffer = Vec::with_capacity(self.part_size); - let out_buffer = std::mem::replace(&mut self.current_buffer, new_buffer); - let inner = Arc::clone(&self.inner); - let part_idx = self.current_part_idx; - self.tasks.push(Box::pin(async move { - let upload_part = inner.put_part(out_buffer, part_idx).await?; - Ok((part_idx, upload_part)) - })); - self.current_part_idx += 1; - - // We need to poll immediately after adding to setup waker - self.as_mut().poll_tasks(cx)?; - } - - // If offset is zero, then we didn't write anything because we didn't - // have capacity for more tasks and our buffer is full. - if offset == 0 && !buf.is_empty() { - Poll::Pending - } else { - Poll::Ready(Ok(offset)) - } - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // Poll current tasks - self.as_mut().poll_tasks(cx)?; - - // If tasks is empty, return Ready - if self.tasks.is_empty() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // First, poll flush - match self.as_mut().final_flush(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(res) => res?, - }; - - // If shutdown task is not set, set it - let parts = std::mem::take(&mut self.completed_parts); - let parts = parts - .into_iter() - .enumerate() - .map(|(idx, part)| { - part.ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - format!("Missing information for upload part {idx}"), - ) - }) - }) - .collect::>()?; - - let inner = Arc::clone(&self.inner); - let completion_task = self.completion_task.get_or_insert_with(|| { - Box::pin(async move { - inner.complete(parts).await?; - Ok(()) - }) - }); - - Pin::new(completion_task).poll(cx) - } -} - -impl std::fmt::Debug for WriteMultiPart { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("WriteMultiPart") - .field("completed_parts", &self.completed_parts) - .field("tasks", &self.tasks) - .field("max_concurrency", &self.max_concurrency) - .field("current_buffer", &self.current_buffer) - .field("part_size", &self.part_size) - .field("current_part_idx", &self.current_part_idx) - .finish() - } -} - /// A low-level interface for interacting with multipart upload APIs /// /// Most use-cases should prefer [`ObjectStore::put_multipart`] as this is supported by more diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 38f9b07bbd05..4b38a6131953 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -19,12 +19,11 @@ use bytes::Bytes; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use std::ops::Range; -use tokio::io::AsyncWrite; use crate::path::Path; use crate::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, - Result, + GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, PutOptions, PutResult, Result, + Upload, }; #[doc(hidden)] @@ -91,18 +90,11 @@ impl ObjectStore for PrefixStore { self.inner.put_opts(&full_path, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let full_path = self.full_path(location); - self.inner.put_multipart(&full_path).await + async fn upload(&self, location: &Path) -> Result> { + let full_path = self.full_path(&location); + self.inner.upload(&full_path).await } - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - let full_path = self.full_path(location); - self.inner.abort_multipart(&full_path, multipart_id).await - } async fn get(&self, location: &Path) -> Result { let full_path = self.full_path(location); self.inner.get(&full_path).await diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 252256a4599e..cef74d5db0f6 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -20,16 +20,15 @@ use parking_lot::Mutex; use std::ops::Range; use std::{convert::TryInto, sync::Arc}; +use crate::GetOptions; use crate::{ path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, - PutResult, Result, + PutResult, Result, Upload, }; -use crate::{GetOptions, MultipartId}; use async_trait::async_trait; use bytes::Bytes; use futures::{stream::BoxStream, FutureExt, StreamExt}; use std::time::Duration; -use tokio::io::AsyncWrite; /// Configuration settings for throttled store #[derive(Debug, Default, Clone, Copy)] @@ -158,14 +157,7 @@ impl ObjectStore for ThrottledStore { self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - _location: &Path, - ) -> Result<(MultipartId, Box)> { - Err(super::Error::NotImplemented) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { + async fn upload(&self, _location: &Path) -> Result> { Err(super::Error::NotImplemented) } diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs new file mode 100644 index 000000000000..7d4fc62e7c83 --- /dev/null +++ b/object_store/src/upload.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PutResult, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use tokio::task::JoinSet; + +/// An upload part request +pub type UploadPart = BoxFuture<'static, Result<()>>; + +#[async_trait] +pub trait Upload: Send + std::fmt::Debug { + /// Upload the next part + /// + /// Returns a stream + /// + /// Most stores require that all parts excluding the last are at least 5 MiB, and some + /// further require that all parts excluding the last be the same size, e.g. [R2]. + /// Clients wanting to maximise compatibility should therefore perform writes in + /// fixed size blocks larger than 5 MiB. + /// + /// Implementations may invoke this method multiple times and then await on the + /// returned futures in parallel + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use object_store::Upload; + /// # + /// # async fn test() { + /// # + /// let mut upload: Box<&dyn Upload> = todo!(); + /// let mut p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); + /// let mut p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into()); + /// + /// let (u1, u2) = futures::future::join(p1.next(), p2.next()).await; + /// u1.unwrap().unwrap(); + /// u2.unwrap().unwrap(); + /// let result = upload.complete().await.unwrap(); + /// # } + /// ``` + /// + /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations + fn put_part(&mut self, data: Bytes) -> UploadPart; + + /// Complete the multipart upload + async fn complete(&mut self) -> Result; + + /// Abort the multipart upload + /// + /// It is implementation defined behaviour if called concurrently with [`UploadPart::execute`] + async fn abort(&mut self) -> Result<()>; +} + +/// A synchronous write API for uploading data in parallel +/// +/// Makes use of [`JoinSet`] under the hood to multiplex upload tasks, +/// avoiding issues caused by sharing a single tokio's cooperative task +/// budget across multiple IO operations. +/// +/// The design also takes inspiration from [`Sink`] with [`ChunkedUpload::wait_for_capacity`] +/// allowing back pressure on producers, prior to buffering the next part. However, unlike +/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers +/// +/// [`Sink`]: futures::sink::Sink +#[derive(Debug)] +pub struct ChunkedUpload { + upload: Box, + + buffer: Vec, + + tasks: JoinSet>, +} + +impl ChunkedUpload { + /// Create a new [`ChunkedUpload`] + pub fn new(upload: Box) -> Self { + Self::new_with_capacity(upload, 5 * 1024 * 1024) + } + + /// Create a new [`ChunkedUpload`] that will upload in fixed `capacity` sized chunks + pub fn new_with_capacity(upload: Box, capacity: usize) -> Self { + Self { + upload, + buffer: Vec::with_capacity(capacity), + tasks: Default::default(), + } + } + + /// Wait until there are `max_concurrency` or fewer requests in-flight + pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> { + while self.tasks.len() > max_concurrency { + self.tasks.join_next().await.unwrap()??; + } + Ok(()) + } + + /// Write data to this [`ChunkedUpload`] + /// + /// Back pressure can optionally be applied to producers by calling + /// [`Self::wait_for_capacity`] prior to calling this method + pub fn write(&mut self, mut buf: &[u8]) { + while !buf.is_empty() { + let capacity = self.buffer.capacity(); + let remaining = capacity - self.buffer.len(); + let to_read = buf.len().min(remaining); + self.buffer.extend_from_slice(&buf[..to_read]); + if to_read == remaining { + let part = std::mem::replace(&mut self.buffer, Vec::with_capacity(capacity)); + self.put_part(part.into()) + } + buf = &buf[to_read..] + } + } + + fn put_part(&mut self, part: Bytes) { + self.tasks.spawn(self.upload.put_part(part)); + } + + /// Abort this upload + pub async fn abort(mut self) -> Result<()> { + self.tasks.shutdown().await; + self.upload.abort().await + } + + /// Flush final chunk, and await completion of all in-flight requests + pub async fn finish(mut self) -> Result { + if !self.buffer.is_empty() { + let part = std::mem::take(&mut self.buffer); + self.put_part(part.into()) + } + + self.wait_for_capacity(0).await?; + self.upload.complete().await + } +}