Skip to content

Commit

Permalink
use Bytes instead of Vec<u8> for PutPart.
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Oct 21, 2023
1 parent ffb7b26 commit 6062e6b
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 15 deletions.
10 changes: 7 additions & 3 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ impl ObjectStore for AmazonS3 {
Ok((id, Box::new(upload)))
}

async fn get_put_part(&self, location: &Path, multipart_id: &MultipartId) -> Result<Box<dyn PutPart>> {
async fn get_put_part(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<Box<dyn PutPart>> {
let upload = S3MultiPartUpload {
location: location.clone(),
upload_id: multipart_id.clone(),
Expand Down Expand Up @@ -270,14 +274,14 @@ struct S3MultiPartUpload {

#[async_trait]
impl PutPart for S3MultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
async fn put_part(&self, buf: Bytes, part_idx: usize) -> Result<PartId> {
let part = (part_idx + 1).to_string();

let content_id = self
.client
.put_request(
&self.location,
buf.into(),
buf,
&[("partNumber", &part), ("uploadId", &self.upload_id)],
)
.await?;
Expand Down
8 changes: 6 additions & 2 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ impl ObjectStore for MicrosoftAzure {
Ok((String::new(), Box::new(inner)))
}

async fn get_put_part(&self, location: &Path, _multipart_id: &MultipartId) -> Result<Box<dyn PutPart>> {
async fn get_put_part(
&self,
location: &Path,
_multipart_id: &MultipartId,
) -> Result<Box<dyn PutPart>> {
let inner = AzureMultiPartUpload {
client: Arc::clone(&self.client),
location: location.to_owned(),
Expand Down Expand Up @@ -172,7 +176,7 @@ struct AzureMultiPartUpload {

#[async_trait]
impl PutPart for AzureMultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
async fn put_part(&self, buf: Bytes, part_idx: usize) -> Result<PartId> {
let content_id = format!("{part_idx:20}");
let block_id: BlockId = content_id.clone().into();

Expand Down
10 changes: 7 additions & 3 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ struct GCSMultipartUpload {
#[async_trait]
impl PutPart for GCSMultipartUpload {
/// Upload an object part <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
async fn put_part(&self, buf: Bytes, part_idx: usize) -> Result<PartId> {
let upload_id = self.multipart_id.clone();
let content_id = self
.client
.put_request(
&self.path,
buf.into(),
buf,
&[
("partNumber", format!("{}", part_idx + 1)),
("uploadId", upload_id),
Expand Down Expand Up @@ -151,7 +151,11 @@ impl ObjectStore for GoogleCloudStorage {
Ok((id, Box::new(upload)))
}

async fn get_put_part(&self, location: &Path, multipart_id: &MultipartId) -> Result<Box<dyn PutPart>> {
async fn get_put_part(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<Box<dyn PutPart>> {
let upload = GCSMultipartUpload {
client: Arc::clone(&self.client),
path: location.clone(),
Expand Down
15 changes: 11 additions & 4 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
#[cfg(feature = "cloud")]
use multipart::PutPart;
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
#[cfg(not(target_arch = "wasm32"))]
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;
use tokio::io::AsyncWrite;
#[cfg(feature = "cloud")]
use multipart::PutPart;

/// An alias for a dynamically dispatched object store implementation.
pub type DynObjectStore = dyn ObjectStore;
Expand Down Expand Up @@ -323,13 +323,20 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {

/// Init multipart upload
#[cfg(feature = "cloud")]
async fn initiate_multipart_upload(&self, _location: &Path) -> Result<(MultipartId, Box<dyn PutPart>)> {
async fn initiate_multipart_upload(
&self,
_location: &Path,
) -> Result<(MultipartId, Box<dyn PutPart>)> {
Err(Error::NotImplemented)
}

/// Get a multi-part upload that allows writing data in chunks
#[cfg(feature = "cloud")]
async fn get_put_part(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<Box<dyn PutPart>> {
async fn get_put_part(
&self,
_location: &Path,
_multipart_id: &MultipartId,
) -> Result<Box<dyn PutPart>> {
Err(Error::NotImplemented)
}

Expand Down
7 changes: 4 additions & 3 deletions object_store/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! especially useful when dealing with large files or high-throughput systems.
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::FuturesUnordered, Future, StreamExt};
use std::{io, pin::Pin, sync::Arc, task::Poll};
use tokio::io::AsyncWrite;
Expand All @@ -36,7 +37,7 @@ type BoxedTryFuture<T> = Pin<Box<dyn Future<Output = Result<T, io::Error>> + Sen
#[async_trait]
pub trait PutPart: Send + Sync + 'static {
/// Upload a single part
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId>;
async fn put_part(&self, buf: Bytes, part_idx: usize) -> Result<PartId>;

/// Complete the upload with the provided parts
///
Expand Down Expand Up @@ -138,7 +139,7 @@ impl<T: PutPart> WriteMultiPart<T> {
let inner = Arc::clone(&self.inner);
let part_idx = self.current_part_idx;
self.tasks.push(Box::pin(async move {
let upload_part = inner.put_part(out_buffer, part_idx).await?;
let upload_part = inner.put_part(out_buffer.into(), part_idx).await?;
Ok((part_idx, upload_part))
}));
}
Expand Down Expand Up @@ -181,7 +182,7 @@ impl<T: PutPart> AsyncWrite for WriteMultiPart<T> {
let inner = Arc::clone(&self.inner);
let part_idx = self.current_part_idx;
self.tasks.push(Box::pin(async move {
let upload_part = inner.put_part(out_buffer, part_idx).await?;
let upload_part = inner.put_part(out_buffer.into(), part_idx).await?;
Ok((part_idx, upload_part))
}));
self.current_part_idx += 1;
Expand Down

0 comments on commit 6062e6b

Please sign in to comment.