Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support non-contiguous put payloads (apache#5514)
Browse files Browse the repository at this point in the history
tustvold committed Apr 4, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 5a0baf1 commit 6255ef7
Showing 26 changed files with 526 additions and 290 deletions.
15 changes: 0 additions & 15 deletions object_store/src/aws/checksum.rs
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@
// under the License.

use crate::config::Parse;
use ring::digest::{self, digest as ring_digest};
use std::str::FromStr;

#[allow(non_camel_case_types)]
@@ -27,20 +26,6 @@ pub enum Checksum {
SHA256,
}

impl Checksum {
pub(super) fn digest(&self, bytes: &[u8]) -> Vec<u8> {
match self {
Self::SHA256 => ring_digest(&digest::SHA256, bytes).as_ref().to_owned(),
}
}

pub(super) fn header_name(&self) -> &'static str {
match self {
Self::SHA256 => "x-amz-checksum-sha256",
}
}
}

impl std::fmt::Display for Checksum {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
78 changes: 42 additions & 36 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
@@ -35,7 +35,8 @@ use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, Result, RetryConfig,
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload, PutResult, Result,
RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
@@ -51,11 +52,14 @@ use reqwest::{
header::{CONTENT_LENGTH, CONTENT_TYPE},
Client as ReqwestClient, Method, RequestBuilder, Response,
};
use ring::digest;
use ring::digest::Context;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;

const VERSION_HEADER: &str = "x-amz-version-id";
const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
@@ -266,7 +270,8 @@ pub(crate) struct Request<'a> {
path: &'a Path,
config: &'a S3Config,
builder: RequestBuilder,
payload_sha256: Option<Vec<u8>>,
payload_sha256: Option<digest::Digest>,
payload: Option<PutPayload>,
use_session_creds: bool,
}

@@ -295,10 +300,12 @@ impl<'a> Request<'a> {
},
};

let sha = self.payload_sha256.as_ref().map(|x| x.as_ref());

let path = self.path.as_ref();
self.builder
.with_aws_sigv4(credential.authorizer(), self.payload_sha256.as_deref())
.send_retry(&self.config.retry_config)
.with_aws_sigv4(credential.authorizer(), sha)
.send_retry(&self.config.retry_config, self.payload)
.await
.context(RetrySnafu { path })
}
@@ -327,28 +334,33 @@ impl S3Client {
pub fn put_request<'a>(
&'a self,
path: &'a Path,
bytes: Bytes,
payload: PutPayload,
with_encryption_headers: bool,
) -> Request<'a> {
let url = self.config.path_url(path);
let mut builder = self.client.request(Method::PUT, url);
if with_encryption_headers {
builder = builder.headers(self.config.encryption_headers.clone().into());
}
let mut payload_sha256 = None;

if let Some(checksum) = self.config.checksum {
let digest = checksum.digest(&bytes);
builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest));
if checksum == Checksum::SHA256 {
payload_sha256 = Some(digest);
let mut sha256 = Context::new(&digest::SHA256);
payload.iter().for_each(|x| sha256.update(&x));
let payload_sha256 = sha256.finish();

match self.config.checksum {
Some(Checksum::SHA256) => {
builder = builder.header(
"x-amz-checksum-sha256",
BASE64_STANDARD.encode(&payload_sha256),
)
}
None => {}
}

builder = match bytes.is_empty() {
true => builder.header(CONTENT_LENGTH, 0), // Handle empty uploads (#4514)
false => builder.body(bytes),
};
// Handle empty uploads (#4514)
if payload.iter().all(|x| x.is_empty()) {
builder = builder.header(CONTENT_LENGTH, 0);
}

if let Some(value) = self.config.client_options.get_content_type(path) {
builder = builder.header(CONTENT_TYPE, value);
@@ -357,7 +369,8 @@ impl S3Client {
Request {
path,
builder,
payload_sha256,
payload: Some(payload),
payload_sha256: Some(payload_sha256),
config: &self.config,
use_session_creds: true,
}
@@ -376,7 +389,7 @@ impl S3Client {
.request(Method::DELETE, url)
.query(query)
.with_aws_sigv4(credential.authorizer(), None)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.retry_config, None)
.await
.map_err(|e| e.error(STORE, path.to_string()))?;

@@ -439,16 +452,8 @@ impl S3Client {

let mut builder = self.client.request(Method::POST, url);

// Compute checksum - S3 *requires* this for DeleteObjects requests, so we default to
// their algorithm if the user hasn't specified one.
let checksum = self.config.checksum.unwrap_or(Checksum::SHA256);
let digest = checksum.digest(&body);
builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest));
let payload_sha256 = if checksum == Checksum::SHA256 {
Some(digest)
} else {
None
};
let digest = digest::digest(&digest::SHA256, &body);
builder = builder.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(&digest));

// S3 *requires* DeleteObjects to include a Content-MD5 header:
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
@@ -461,8 +466,8 @@ impl S3Client {
let response = builder
.header(CONTENT_TYPE, "application/xml")
.body(body)
.with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref())
.send_retry(&self.config.retry_config)
.with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
.send_retry(&self.config.retry_config, None)
.await
.context(DeleteObjectsRequestSnafu {})?
.bytes()
@@ -508,6 +513,7 @@ impl S3Client {
builder,
path: from,
config: &self.config,
payload: None,
payload_sha256: None,
use_session_creds: false,
}
@@ -522,7 +528,7 @@ impl S3Client {
.request(Method::POST, url)
.headers(self.config.encryption_headers.clone().into())
.with_aws_sigv4(credential.authorizer(), None)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.retry_config, None)
.await
.context(CreateMultipartRequestSnafu)?
.bytes()
@@ -540,7 +546,7 @@ impl S3Client {
path: &Path,
upload_id: &MultipartId,
part_idx: usize,
data: Bytes,
data: PutPayload,
) -> Result<PartId> {
let part = (part_idx + 1).to_string();

@@ -564,7 +570,7 @@ impl S3Client {
// If no parts were uploaded, upload an empty part
// otherwise the completion request will fail
let part = self
.put_part(location, &upload_id.to_string(), 0, Bytes::new())
.put_part(location, &upload_id.to_string(), 0, PutPayload::default())
.await?;
vec![part]
} else {
@@ -582,7 +588,7 @@ impl S3Client {
.query(&[("uploadId", upload_id)])
.body(body)
.with_aws_sigv4(credential.authorizer(), None)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.retry_config, None)
.await
.context(CompleteMultipartRequestSnafu)?;

@@ -610,7 +616,7 @@ impl S3Client {
.client
.request(Method::GET, url)
.with_aws_sigv4(credential.authorizer(), None)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.retry_config, None)
.await
.map_err(|e| e.error(STORE, path.to_string()))?;
Ok(response)
@@ -645,7 +651,7 @@ impl GetClient for S3Client {
let response = builder
.with_get_options(options)
.with_aws_sigv4(credential.authorizer(), None)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.retry_config, None)
.await
.map_err(|e| e.error(STORE, path.to_string()))?;

@@ -691,7 +697,7 @@ impl ListClient for S3Client {
.request(Method::GET, &url)
.query(&query)
.with_aws_sigv4(credential.authorizer(), None)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.retry_config, None)
.await
.context(ListRequestSnafu)?
.bytes()
12 changes: 6 additions & 6 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
@@ -517,7 +517,7 @@ async fn instance_creds(
let token_result = client
.request(Method::PUT, token_url)
.header("X-aws-ec2-metadata-token-ttl-seconds", "600") // 10 minute TTL
.send_retry(retry_config)
.send_retry(retry_config, None)
.await;

let token = match token_result {
@@ -536,15 +536,15 @@ async fn instance_creds(
role_request = role_request.header(AWS_EC2_METADATA_TOKEN_HEADER, token);
}

let role = role_request.send_retry(retry_config).await?.text().await?;
let role = role_request.send_retry(retry_config, None).await?.text().await?;

let creds_url = format!("{endpoint}/{CREDENTIALS_PATH}/{role}");
let mut creds_request = client.request(Method::GET, creds_url);
if let Some(token) = &token {
creds_request = creds_request.header(AWS_EC2_METADATA_TOKEN_HEADER, token);
}

let creds: InstanceCredentials = creds_request.send_retry(retry_config).await?.json().await?;
let creds: InstanceCredentials = creds_request.send_retry(retry_config, None).await?.json().await?;

let now = Utc::now();
let ttl = (creds.expiration - now).to_std().unwrap_or_default();
@@ -607,7 +607,7 @@ async fn web_identity(
("Version", "2011-06-15"),
("WebIdentityToken", &token),
])
.send_retry(retry_config)
.send_retry(retry_config, None)
.await?
.bytes()
.await?;
@@ -657,7 +657,7 @@ async fn task_credential(
retry: &RetryConfig,
url: &str,
) -> Result<TemporaryToken<Arc<AwsCredential>>, StdError> {
let creds: InstanceCredentials = client.get(url).send_retry(retry).await?.json().await?;
let creds: InstanceCredentials = client.get(url).send_retry(retry, None).await?.json().await?;

let now = Utc::now();
let ttl = (creds.expiration - now).to_std().unwrap_or_default();
@@ -692,7 +692,7 @@ impl TokenProvider for SessionProvider {
let bytes = client
.get(format!("{}?session", self.endpoint))
.with_aws_sigv4(Some(authorizer), None)
.send_retry(retry)
.send_retry(retry, None)
.await
.context(CreateSessionRequestSnafu)?
.bytes()
2 changes: 1 addition & 1 deletion object_store/src/aws/dynamo.rs
Original file line number Diff line number Diff line change
@@ -334,7 +334,7 @@ impl DynamoCommit {
.json(&req)
.header("X-Amz-Target", target)
.with_aws_sigv4(authorizer, None)
.send_retry(&s3.config.retry_config)
.send_retry(&s3.config.retry_config, None)
.await
}
}
19 changes: 7 additions & 12 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@
//! [automatic cleanup]: https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
@@ -44,10 +43,7 @@ use crate::client::CredentialProvider;
use crate::multipart::{MultipartStore, PartId};
use crate::signer::Signer;
use crate::util::STRICT_ENCODE_SET;
use crate::{
Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart,
};
use crate::{Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, Path, PutMode, PutOptions, PutPayload, PutResult, Result, UploadPart};

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");

@@ -151,8 +147,8 @@ impl Signer for AmazonS3 {

#[async_trait]
impl ObjectStore for AmazonS3 {
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
let mut request = self.client.put_request(location, bytes, true);
async fn put_opts(&self, location: &Path, payload: PutPayload, opts: PutOptions) -> Result<PutResult> {
let mut request = self.client.put_request(location, payload.into(), true);
let tags = opts.tags.encoded();
if !tags.is_empty() && !self.client.config.disable_tagging {
request = request.header(&TAGS_HEADER, tags);
@@ -316,7 +312,7 @@ struct UploadState {

#[async_trait]
impl MultipartUpload for S3MultiPartUpload {
fn put_part(&mut self, data: Bytes) -> UploadPart {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
let state = Arc::clone(&self.state);
@@ -358,7 +354,7 @@ impl MultipartStore for AmazonS3 {
path: &Path,
id: &MultipartId,
part_idx: usize,
data: Bytes,
data: PutPayload,
) -> Result<PartId> {
self.client.put_part(path, id, part_idx, data).await
}
@@ -381,7 +377,6 @@ impl MultipartStore for AmazonS3 {
mod tests {
use super::*;
use crate::{client::get::GetClient, tests::*};
use bytes::Bytes;
use hyper::HeaderMap;

const NON_EXISTENT_NAME: &str = "nonexistentname";
@@ -470,7 +465,7 @@ mod tests {
let integration = config.build().unwrap();

let location = Path::from_iter([NON_EXISTENT_NAME]);
let data = Bytes::from("arbitrary data");
let data = PutPayload::from("arbitrary data");

let err = integration.put(&location, data).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
@@ -527,7 +522,7 @@ mod tests {
async fn s3_encryption(store: &AmazonS3) {
crate::test_util::maybe_skip_integration!();

let data = Bytes::from(vec![3u8; 1024]);
let data = PutPayload::from(vec![3u8; 1024]);

let encryption_headers: HeaderMap = store.client.config.encryption_headers.clone().into();
let expected_encryption =
Loading

0 comments on commit 6255ef7

Please sign in to comment.