Skip to content

Commit

Permalink
Add direct access to PutPart in ObjectStore.
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Oct 21, 2023
1 parent 03d0505 commit ffb7b26
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 0 deletions.
25 changes: 25 additions & 0 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,31 @@ impl ObjectStore for AmazonS3 {
Ok((id, Box::new(WriteMultiPart::new(upload, 8))))
}

async fn initiate_multipart_upload(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn PutPart>)> {
let id = self.client.create_multipart(location).await?;

let upload = S3MultiPartUpload {
location: location.clone(),
upload_id: id.clone(),
client: Arc::clone(&self.client),
};

Ok((id, Box::new(upload)))
}

async fn get_put_part(&self, location: &Path, multipart_id: &MultipartId) -> Result<Box<dyn PutPart>> {
let upload = S3MultiPartUpload {
location: location.clone(),
upload_id: multipart_id.clone(),
client: Arc::clone(&self.client),
};

Ok(Box::new(upload))
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.client
.delete_request(location, &[("uploadId", multipart_id)])
Expand Down
21 changes: 21 additions & 0 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,27 @@ impl ObjectStore for MicrosoftAzure {
Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8))))
}

async fn initiate_multipart_upload(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn PutPart>)> {
let inner = AzureMultiPartUpload {
client: Arc::clone(&self.client),
location: location.to_owned(),
};

Ok((String::new(), Box::new(inner)))
}

async fn get_put_part(&self, location: &Path, _multipart_id: &MultipartId) -> Result<Box<dyn PutPart>> {
let inner = AzureMultiPartUpload {
client: Arc::clone(&self.client),
location: location.to_owned(),
};

Ok(Box::new(inner))
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
// There is no way to drop blocks that have been uploaded. Instead, they simply
// expire in 7 days.
Expand Down
25 changes: 25 additions & 0 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,31 @@ impl ObjectStore for GoogleCloudStorage {
Ok((upload_id, Box::new(WriteMultiPart::new(inner, 8))))
}

async fn initiate_multipart_upload(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn PutPart>)> {
let id = self.client.multipart_initiate(location).await?;

let upload = GCSMultipartUpload {
client: Arc::clone(&self.client),
path: location.clone(),
multipart_id: id.clone(),
};

Ok((id, Box::new(upload)))
}

async fn get_put_part(&self, location: &Path, multipart_id: &MultipartId) -> Result<Box<dyn PutPart>> {
let upload = GCSMultipartUpload {
client: Arc::clone(&self.client),
path: location.clone(),
multipart_id: multipart_id.clone(),
};

Ok(Box::new(upload))
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.client
.multipart_cleanup(location, multipart_id)
Expand Down
14 changes: 14 additions & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;
use tokio::io::AsyncWrite;
#[cfg(feature = "cloud")]
use multipart::PutPart;

/// An alias for a dynamically dispatched object store implementation.
pub type DynObjectStore = dyn ObjectStore;
Expand Down Expand Up @@ -319,6 +321,18 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)>;

/// Init multipart upload
#[cfg(feature = "cloud")]
async fn initiate_multipart_upload(&self, _location: &Path) -> Result<(MultipartId, Box<dyn PutPart>)> {
Err(Error::NotImplemented)
}

/// Get a multi-part upload that allows writing data in chunks
#[cfg(feature = "cloud")]
async fn get_put_part(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<Box<dyn PutPart>> {
Err(Error::NotImplemented)
}

/// Cleanup an aborted upload.
///
/// See documentation for individual stores for exact behavior, as capabilities
Expand Down

0 comments on commit ffb7b26

Please sign in to comment.