Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MultiPartStore (#4961) (#4608) #4971

Merged
merged 7 commits into from
Oct 25, 2023
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
@@ -27,7 +27,9 @@ use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, RetryConfig};
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
@@ -115,6 +117,9 @@ pub(crate) enum Error {
#[snafu(display("Error performing complete multipart request: {}", source))]
CompleteMultipartRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting complete multipart response body: {}", source))]
CompleteMultipartResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid list response: {}", source))]
InvalidListResponse { source: quick_xml::de::DeError },

@@ -162,6 +167,13 @@ struct MultipartPart {
part_number: usize,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase", rename = "CompleteMultipartUploadResult")]
struct CompleteMultipartResult {
#[serde(rename = "ETag")]
e_tag: String,
}

#[derive(Deserialize)]
#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
struct BatchDeleteResponse {
@@ -506,12 +518,32 @@ impl S3Client {
Ok(response.upload_id)
}

pub async fn put_part(
&self,
path: &Path,
upload_id: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
let part = (part_idx + 1).to_string();

let content_id = self
.put_request(
path,
data,
&[("partNumber", &part), ("uploadId", upload_id)],
)
.await?;

Ok(PartId { content_id })
}

pub async fn complete_multipart(
&self,
location: &Path,
upload_id: &str,
parts: Vec<PartId>,
) -> Result<()> {
) -> Result<PutResult> {
let parts = parts
.into_iter()
.enumerate()
@@ -527,7 +559,8 @@ impl S3Client {
let credential = self.get_credential().await?;
let url = self.config.path_url(location);

self.client
let response = self
.client
.request(Method::POST, url)
.query(&[("uploadId", upload_id)])
.body(body)
@@ -542,7 +575,17 @@ impl S3Client {
.await
.context(CompleteMultipartRequestSnafu)?;

Ok(())
let data = response
.bytes()
.await
.context(CompleteMultipartResponseBodySnafu)?;

let response: CompleteMultipartResult =
quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;

Ok(PutResult {
e_tag: Some(response.e_tag),
})
}
}

47 changes: 34 additions & 13 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ use crate::aws::client::S3Client;
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{PartId, PutPart, WriteMultiPart};
use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
use crate::signer::Signer;
use crate::{
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutResult,
@@ -246,18 +246,9 @@ struct S3MultiPartUpload {
#[async_trait]
impl PutPart for S3MultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let part = (part_idx + 1).to_string();

let content_id = self
.client
.put_request(
&self.location,
buf.into(),
&[("partNumber", &part), ("uploadId", &self.upload_id)],
)
.await?;

Ok(PartId { content_id })
self.client
.put_part(&self.location, &self.upload_id, part_idx, buf.into())
.await
}

async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
@@ -268,6 +259,36 @@ impl PutPart for S3MultiPartUpload {
}
}

#[async_trait]
impl MultiPartStore for AmazonS3 {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
self.client.create_multipart(path).await
}

async fn put_part(
&self,
path: &Path,
id: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
self.client.put_part(path, id, part_idx, data).await
}

async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
self.client.complete_multipart(path, id, parts).await
}

async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
self.client.delete_request(path, &[("uploadId", id)]).await
}
}

#[cfg(test)]
mod tests {
use super::*;
49 changes: 47 additions & 2 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
@@ -19,13 +19,16 @@ use super::credential::AzureCredential;
use crate::azure::credential::*;
use crate::azure::{AzureCredentialProvider, STORE};
use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::header::{get_etag, HeaderConfig};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::deserialize_rfc1123;
use crate::{ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result, RetryConfig};
use crate::{
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
@@ -84,6 +87,11 @@ pub(crate) enum Error {
Authorization {
source: crate::azure::credential::Error,
},

#[snafu(display("Unable to extract metadata from headers: {}", source))]
Metadata {
source: crate::client::header::Error,
},
}

impl From<Error> for crate::Error {
@@ -190,6 +198,43 @@ impl AzureClient {
Ok(response)
}

/// PUT a block <https://learn.microsoft.com/en-us/rest/api/storageservices/put-block>
pub async fn put_block(&self, path: &Path, part_idx: usize, data: Bytes) -> Result<PartId> {
let content_id = format!("{part_idx:20}");
let block_id: BlockId = content_id.clone().into();

self.put_request(
path,
Some(data),
true,
&[
("comp", "block"),
("blockid", &BASE64_STANDARD.encode(block_id)),
],
)
.await?;

Ok(PartId { content_id })
}

/// PUT a block list <https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list>
pub async fn put_block_list(&self, path: &Path, parts: Vec<PartId>) -> Result<PutResult> {
let blocks = parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
.collect();

let block_list = BlockList { blocks };
let block_xml = block_list.to_xml();

let response = self
.put_request(path, Some(block_xml.into()), true, &[("comp", "blocklist")])
.await?;

let e_tag = get_etag(response.headers()).context(MetadataSnafu)?;
Ok(PutResult { e_tag: Some(e_tag) })
}

/// Make an Azure Delete request <https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob>
pub async fn delete_request<T: Serialize + ?Sized + Sync>(
&self,
67 changes: 33 additions & 34 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
@@ -26,15 +26,12 @@
//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide
//! a way to drop old blocks. Instead unused blocks are automatically cleaned up
//! after 7 days.
use self::client::{BlockId, BlockList};
use crate::{
multipart::{PartId, PutPart, WriteMultiPart},
path::Path,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use futures::stream::BoxStream;
use std::fmt::Debug;
@@ -53,6 +50,7 @@ mod credential;
/// [`CredentialProvider`] for [`MicrosoftAzure`]
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
use crate::client::header::get_etag;
use crate::multipart::MultiPartStore;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;

@@ -151,43 +149,44 @@ struct AzureMultiPartUpload {

#[async_trait]
impl PutPart for AzureMultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let content_id = format!("{part_idx:20}");
let block_id: BlockId = content_id.clone().into();

self.client
.put_request(
&self.location,
Some(buf.into()),
true,
&[
("comp", "block"),
("blockid", &BASE64_STANDARD.encode(block_id)),
],
)
.await?;
async fn put_part(&self, buf: Vec<u8>, idx: usize) -> Result<PartId> {
self.client.put_block(&self.location, idx, buf.into()).await
}

Ok(PartId { content_id })
async fn complete(&self, parts: Vec<PartId>) -> Result<()> {
self.client.put_block_list(&self.location, parts).await?;
Ok(())
}
}

async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
let blocks = completed_parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
.collect();
#[async_trait]
impl MultiPartStore for MicrosoftAzure {
async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
Ok(String::new())
}

let block_list = BlockList { blocks };
let block_xml = block_list.to_xml();
async fn put_part(
&self,
path: &Path,
_: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
self.client.put_block(path, part_idx, data).await
}

self.client
.put_request(
&self.location,
Some(block_xml.into()),
true,
&[("comp", "blocklist")],
)
.await?;
async fn complete_multipart(
&self,
path: &Path,
_: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
self.client.put_block_list(path, parts).await
}

async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
// There is no way to drop blocks that have been uploaded. Instead, they simply
// expire in 7 days.
Ok(())
}
}
Loading