diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index bdbfc0bec4bb..c073e37da6f4 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -161,6 +161,12 @@ jobs: - name: Run object_store tests run: cargo test --features=aws,azure,gcp,http + - name: Run object_store tests (AWS native conditional put) + run: cargo test --features=aws + env: + AWS_CONDITIONAL_PUT: etag-create-only + AWS_COPY_IF_NOT_EXISTS: multipart + - name: GCS Output if: ${{ !cancelled() }} run: docker logs $GCS_CONTAINER diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 4b4d0b6e3b4e..2e04683e7b30 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -28,8 +28,8 @@ use crate::client::header::{get_put_result, get_version}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::s3::{ - CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, - ListResponse, + CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult, + InitiateMultipartUploadResult, ListResponse, }; use crate::client::GetOptionsExt; use crate::multipart::PartId; @@ -98,8 +98,11 @@ pub(crate) enum Error { #[snafu(display("Error getting create multipart response body: {}", source))] CreateMultipartResponseBody { source: reqwest::Error }, - #[snafu(display("Error performing complete multipart request: {}", source))] - CompleteMultipartRequest { source: crate::client::retry::Error }, + #[snafu(display("Error performing complete multipart request: {}: {}", path, source))] + CompleteMultipartRequest { + source: crate::client::retry::Error, + path: String, + }, #[snafu(display("Error getting complete multipart response body: {}", source))] CompleteMultipartResponseBody { source: reqwest::Error }, @@ -118,13 +121,32 @@ pub(crate) enum Error { impl From for crate::Error { fn from(err: Error) -> Self { - Self::Generic { - store: STORE, - source: Box::new(err), + match err { + Error::CompleteMultipartRequest { source, path } => source.error(STORE, path), + _ => Self::Generic { + store: STORE, + source: Box::new(err), + }, } } } +pub(crate) enum PutPartPayload<'a> { + Part(PutPayload), + Copy(&'a Path), +} + +impl Default for PutPartPayload<'_> { + fn default() -> Self { + Self::Part(PutPayload::default()) + } +} + +pub(crate) enum CompleteMultipartMode { + Overwrite, + Create, +} + #[derive(Deserialize)] #[serde(rename_all = "PascalCase", rename = "DeleteResult")] struct BatchDeleteResponse { @@ -605,15 +627,24 @@ impl S3Client { path: &Path, upload_id: &MultipartId, part_idx: usize, - data: PutPayload, + data: PutPartPayload<'_>, ) -> Result { + let is_copy = matches!(data, PutPartPayload::Copy(_)); let part = (part_idx + 1).to_string(); let mut request = self .request(Method::PUT, path) - .with_payload(data) .query(&[("partNumber", &part), ("uploadId", upload_id)]) .idempotent(true); + + request = match data { + PutPartPayload::Part(payload) => request.with_payload(payload), + PutPartPayload::Copy(path) => request.header( + "x-amz-copy-source", + &format!("{}/{}", self.config.bucket, path), + ), + }; + if self .config .encryption_headers @@ -625,7 +656,18 @@ impl S3Client { } let response = request.send().await?; - let content_id = get_etag(response.headers()).context(MetadataSnafu)?; + let content_id = match is_copy { + false => get_etag(response.headers()).context(MetadataSnafu)?, + true => { + let response = response + .bytes() + .await + .context(CreateMultipartResponseBodySnafu)?; + let response: CopyPartResult = quick_xml::de::from_reader(response.reader()) + .context(InvalidMultipartResponseSnafu)?; + response.e_tag + } + }; Ok(PartId { content_id }) } @@ -634,12 +676,18 @@ impl S3Client { location: &Path, upload_id: &str, parts: Vec, + mode: CompleteMultipartMode, ) -> Result { let parts = if parts.is_empty() { // If no parts were uploaded, upload an empty part // otherwise the completion request will fail let part = self - .put_part(location, &upload_id.to_string(), 0, PutPayload::default()) + .put_part( + location, + &upload_id.to_string(), + 0, + PutPartPayload::default(), + ) .await?; vec![part] } else { @@ -651,18 +699,27 @@ impl S3Client { let credential = self.config.get_session_credential().await?; let url = self.config.path_url(location); - let response = self + let request = self .client .request(Method::POST, url) .query(&[("uploadId", upload_id)]) .body(body) - .with_aws_sigv4(credential.authorizer(), None) + .with_aws_sigv4(credential.authorizer(), None); + + let request = match mode { + CompleteMultipartMode::Overwrite => request, + CompleteMultipartMode::Create => request.header("If-None-Match", "*"), + }; + + let response = request .retryable(&self.config.retry_config) .idempotent(true) .retry_error_body(true) .send() .await - .context(CompleteMultipartRequestSnafu)?; + .context(CompleteMultipartRequestSnafu { + path: location.as_ref(), + })?; let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?; diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index a27ed053317e..acfa05395901 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -36,7 +36,7 @@ use reqwest::{Method, StatusCode}; use std::{sync::Arc, time::Duration}; use url::Url; -use crate::aws::client::{RequestError, S3Client}; +use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3Client}; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; @@ -169,7 +169,10 @@ impl ObjectStore for AmazonS3 { match (opts.mode, &self.client.config.conditional_put) { (PutMode::Overwrite, _) => request.idempotent(true).do_put().await, (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), - (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => { + ( + PutMode::Create, + Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagCreateOnly), + ) => { match request.header(&IF_NONE_MATCH, "*").do_put().await { // Technically If-None-Match should return NotModified but some stores, // such as R2, instead return PreconditionFailed @@ -193,6 +196,7 @@ impl ObjectStore for AmazonS3 { source: "ETag required for conditional put".to_string().into(), })?; match put { + S3ConditionalPut::ETagCreateOnly => Err(Error::NotImplemented), S3ConditionalPut::ETagMatch => { request.header(&IF_MATCH, etag.as_str()).do_put().await } @@ -293,6 +297,34 @@ impl ObjectStore for AmazonS3 { let (k, v, status) = match &self.client.config.copy_if_not_exists { Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED), Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status), + Some(S3CopyIfNotExists::Multipart) => { + let upload_id = self + .client + .create_multipart(to, PutMultipartOpts::default()) + .await?; + let part_id = self + .client + .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) + .await?; + let res = match self + .client + .complete_multipart( + to, + &upload_id, + vec![part_id], + CompleteMultipartMode::Create, + ) + .await + { + Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { + path: to.to_string(), + source: Box::new(e), + }), + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + }; + return res; + } Some(S3CopyIfNotExists::Dynamo(lock)) => { return lock.copy_if_not_exists(&self.client, from, to).await } @@ -340,7 +372,12 @@ impl MultipartUpload for S3MultiPartUpload { Box::pin(async move { let part = state .client - .put_part(&state.location, &state.upload_id, idx, data) + .put_part( + &state.location, + &state.upload_id, + idx, + PutPartPayload::Part(data), + ) .await?; state.parts.put(idx, part); Ok(()) @@ -352,7 +389,12 @@ impl MultipartUpload for S3MultiPartUpload { self.state .client - .complete_multipart(&self.state.location, &self.state.upload_id, parts) + .complete_multipart( + &self.state.location, + &self.state.upload_id, + parts, + CompleteMultipartMode::Overwrite, + ) .await } @@ -384,7 +426,9 @@ impl MultipartStore for AmazonS3 { part_idx: usize, data: PutPayload, ) -> Result { - self.client.put_part(path, id, part_idx, data).await + self.client + .put_part(path, id, part_idx, PutPartPayload::Part(data)) + .await } async fn complete_multipart( @@ -393,7 +437,9 @@ impl MultipartStore for AmazonS3 { id: &MultipartId, parts: Vec, ) -> Result { - self.client.complete_multipart(path, id, parts).await + self.client + .complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite) + .await } async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> { @@ -427,7 +473,6 @@ mod tests { let integration = config.build().unwrap(); let config = &integration.client.config; let test_not_exists = config.copy_if_not_exists.is_some(); - let test_conditional_put = config.conditional_put.is_some(); put_get_delete_list(&integration).await; get_opts(&integration).await; @@ -458,8 +503,9 @@ mod tests { if test_not_exists { copy_if_not_exists(&integration).await; } - if test_conditional_put { - put_opts(&integration, true).await; + if let Some(conditional_put) = &config.conditional_put { + let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagCreateOnly); + put_opts(&integration, supports_update).await; } // run integration test with unsigned payload enabled diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index ad9e21537939..80f3c1a03615 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -46,6 +46,15 @@ pub enum S3CopyIfNotExists { /// /// Encoded as `header-with-status:::` ignoring whitespace HeaderWithStatus(String, String, reqwest::StatusCode), + /// Native Amazon S3 supports copy if not exists through a multipart upload + /// where the upload copies an existing object and is completed only if + /// the new object does not already exist. + /// + /// WARNING: When using this mode, `copy_if_not_exists` does not copy + /// tags or attributes from the source object. + /// + /// Encoded as `multipart` ignoring whitespace. + Multipart, /// The name of a DynamoDB table to use for coordination /// /// Encoded as either `dynamo:` or `dynamo::` @@ -64,6 +73,7 @@ impl std::fmt::Display for S3CopyIfNotExists { Self::HeaderWithStatus(k, v, code) => { write!(f, "header-with-status: {k}: {v}: {}", code.as_u16()) } + Self::Multipart => f.write_str("multipart"), Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } @@ -71,6 +81,11 @@ impl std::fmt::Display for S3CopyIfNotExists { impl S3CopyIfNotExists { fn from_str(s: &str) -> Option { + match s.trim() { + "multipart" => return Some(Self::Multipart), + _ => (), + }; + let (variant, value) = s.split_once(':')?; match variant.trim() { "header" => { @@ -118,6 +133,17 @@ pub enum S3ConditionalPut { /// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions ETagMatch, + /// Like `ETagMatch`, but with support for `PutMode::Create` and not + /// `PutMode::Option`. + /// + /// This is the limited form of conditional put supported by Amazon S3 + /// as of August 2024 ([announcement]). + /// + /// Encoded as `etag-create-only` ignoring whitespace. + /// + /// [announcement]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/ + ETagCreateOnly, + /// The name of a DynamoDB table to use for coordination /// /// Encoded as either `dynamo:` or `dynamo::` @@ -133,6 +159,7 @@ impl std::fmt::Display for S3ConditionalPut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ETagMatch => write!(f, "etag"), + Self::ETagCreateOnly => write!(f, "etag-create-only"), Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } @@ -142,6 +169,7 @@ impl S3ConditionalPut { fn from_str(s: &str) -> Option { match s.trim() { "etag" => Some(Self::ETagMatch), + "etag-create-only" => Some(Self::ETagCreateOnly), trimmed => match trimmed.split_once(':')? { ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), _ => None, diff --git a/object_store/src/client/s3.rs b/object_store/src/client/s3.rs index a9c47260e6f4..76be1116753c 100644 --- a/object_store/src/client/s3.rs +++ b/object_store/src/client/s3.rs @@ -92,6 +92,13 @@ pub(crate) struct InitiateMultipartUploadResult { pub upload_id: String, } +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct CopyPartResult { + #[serde(rename = "ETag")] + pub e_tag: String, +} + #[derive(Debug, Serialize)] #[serde(rename_all = "PascalCase")] pub(crate) struct CompleteMultipartUpload { diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 89b21bc61696..30177878306f 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -651,6 +651,12 @@ pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { assert_eq!(b.as_ref(), b"a"); if !supports_update { + let err = storage + .put_opts(&path, "c".into(), PutMode::Update(v1.clone().into()).into()) + .await + .unwrap_err(); + assert!(matches!(err, Error::NotImplemented { .. }), "{err}"); + return; }