From 6062e6b1286297922bbf658eecef293dda19a643 Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Fri, 20 Oct 2023 18:38:23 -0700 Subject: [PATCH] use Bytes instead of Vec for PutPart. --- object_store/src/aws/mod.rs | 10 +++++++--- object_store/src/azure/mod.rs | 8 ++++++-- object_store/src/gcp/mod.rs | 10 +++++++--- object_store/src/lib.rs | 15 +++++++++++---- object_store/src/multipart.rs | 7 ++++--- 5 files changed, 35 insertions(+), 15 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index f3ca6afe18b3..29f0382461d4 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -193,7 +193,11 @@ impl ObjectStore for AmazonS3 { Ok((id, Box::new(upload))) } - async fn get_put_part(&self, location: &Path, multipart_id: &MultipartId) -> Result> { + async fn get_put_part( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> Result> { let upload = S3MultiPartUpload { location: location.clone(), upload_id: multipart_id.clone(), @@ -270,14 +274,14 @@ struct S3MultiPartUpload { #[async_trait] impl PutPart for S3MultiPartUpload { - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { + async fn put_part(&self, buf: Bytes, part_idx: usize) -> Result { let part = (part_idx + 1).to_string(); let content_id = self .client .put_request( &self.location, - buf.into(), + buf, &[("partNumber", &part), ("uploadId", &self.upload_id)], ) .await?; diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 348b823702d3..0b48622a9438 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -119,7 +119,11 @@ impl ObjectStore for MicrosoftAzure { Ok((String::new(), Box::new(inner))) } - async fn get_put_part(&self, location: &Path, _multipart_id: &MultipartId) -> Result> { + async fn get_put_part( + &self, + location: &Path, + _multipart_id: &MultipartId, + ) -> Result> { let inner = AzureMultiPartUpload { client: Arc::clone(&self.client), location: location.to_owned(), @@ -172,7 +176,7 @@ struct AzureMultiPartUpload { #[async_trait] impl PutPart for AzureMultiPartUpload { - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { + async fn put_part(&self, buf: Bytes, part_idx: usize) -> Result { let content_id = format!("{part_idx:20}"); let block_id: BlockId = content_id.clone().into(); diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index e1f307298d54..c979749b032b 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -89,13 +89,13 @@ struct GCSMultipartUpload { #[async_trait] impl PutPart for GCSMultipartUpload { /// Upload an object part - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { + async fn put_part(&self, buf: Bytes, part_idx: usize) -> Result { let upload_id = self.multipart_id.clone(); let content_id = self .client .put_request( &self.path, - buf.into(), + buf, &[ ("partNumber", format!("{}", part_idx + 1)), ("uploadId", upload_id), @@ -151,7 +151,11 @@ impl ObjectStore for GoogleCloudStorage { Ok((id, Box::new(upload))) } - async fn get_put_part(&self, location: &Path, multipart_id: &MultipartId) -> Result> { + async fn get_put_part( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> Result> { let upload = GCSMultipartUpload { client: Arc::clone(&self.client), path: location.clone(), diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index cd29c61f94b5..d29fa2b28bab 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -278,6 +278,8 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; +#[cfg(feature = "cloud")] +use multipart::PutPart; use snafu::Snafu; use std::fmt::{Debug, Formatter}; #[cfg(not(target_arch = "wasm32"))] @@ -285,8 +287,6 @@ use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::sync::Arc; use tokio::io::AsyncWrite; -#[cfg(feature = "cloud")] -use multipart::PutPart; /// An alias for a dynamically dispatched object store implementation. pub type DynObjectStore = dyn ObjectStore; @@ -323,13 +323,20 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// Init multipart upload #[cfg(feature = "cloud")] - async fn initiate_multipart_upload(&self, _location: &Path) -> Result<(MultipartId, Box)> { + async fn initiate_multipart_upload( + &self, + _location: &Path, + ) -> Result<(MultipartId, Box)> { Err(Error::NotImplemented) } /// Get a multi-part upload that allows writing data in chunks #[cfg(feature = "cloud")] - async fn get_put_part(&self, _location: &Path, _multipart_id: &MultipartId) -> Result> { + async fn get_put_part( + &self, + _location: &Path, + _multipart_id: &MultipartId, + ) -> Result> { Err(Error::NotImplemented) } diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index d4c911fceab4..9820793cde7d 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -22,6 +22,7 @@ //! especially useful when dealing with large files or high-throughput systems. use async_trait::async_trait; +use bytes::Bytes; use futures::{stream::FuturesUnordered, Future, StreamExt}; use std::{io, pin::Pin, sync::Arc, task::Poll}; use tokio::io::AsyncWrite; @@ -36,7 +37,7 @@ type BoxedTryFuture = Pin> + Sen #[async_trait] pub trait PutPart: Send + Sync + 'static { /// Upload a single part - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result; + async fn put_part(&self, buf: Bytes, part_idx: usize) -> Result; /// Complete the upload with the provided parts /// @@ -138,7 +139,7 @@ impl WriteMultiPart { let inner = Arc::clone(&self.inner); let part_idx = self.current_part_idx; self.tasks.push(Box::pin(async move { - let upload_part = inner.put_part(out_buffer, part_idx).await?; + let upload_part = inner.put_part(out_buffer.into(), part_idx).await?; Ok((part_idx, upload_part)) })); } @@ -181,7 +182,7 @@ impl AsyncWrite for WriteMultiPart { let inner = Arc::clone(&self.inner); let part_idx = self.current_part_idx; self.tasks.push(Box::pin(async move { - let upload_part = inner.put_part(out_buffer, part_idx).await?; + let upload_part = inner.put_part(out_buffer.into(), part_idx).await?; Ok((part_idx, upload_part)) })); self.current_part_idx += 1;