diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index bdbfc0bec4bb..1857b330326a 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -138,7 +138,7 @@ jobs: - name: Setup LocalStack (AWS emulation) run: | - echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.3.0)" >> $GITHUB_ENV + echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.8.1)" >> $GITHUB_ENV echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 @@ -161,6 +161,12 @@ jobs: - name: Run object_store tests run: cargo test --features=aws,azure,gcp,http + - name: Run object_store tests (AWS native conditional put) + run: cargo test --features=aws + env: + AWS_CONDITIONAL_PUT: etag-put-if-not-exists + AWS_COPY_IF_NOT_EXISTS: multipart + - name: GCS Output if: ${{ !cancelled() }} run: docker logs $GCS_CONTAINER diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 4b4d0b6e3b4e..a610e635178d 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -28,8 +28,8 @@ use crate::client::header::{get_put_result, get_version}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::s3::{ - CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, - ListResponse, + CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult, + InitiateMultipartUploadResult, ListResponse, }; use crate::client::GetOptionsExt; use crate::multipart::PartId; @@ -98,8 +98,11 @@ pub(crate) enum Error { #[snafu(display("Error getting create multipart response body: {}", source))] CreateMultipartResponseBody { source: reqwest::Error }, - #[snafu(display("Error performing complete multipart request: {}", source))] - CompleteMultipartRequest { source: crate::client::retry::Error }, + #[snafu(display("Error performing complete multipart request: {}: {}", path, source))] + CompleteMultipartRequest { + source: crate::client::retry::Error, + path: String, + }, #[snafu(display("Error getting complete multipart response body: {}", source))] CompleteMultipartResponseBody { source: reqwest::Error }, @@ -118,13 +121,32 @@ pub(crate) enum Error { impl From for crate::Error { fn from(err: Error) -> Self { - Self::Generic { - store: STORE, - source: Box::new(err), + match err { + Error::CompleteMultipartRequest { source, path } => source.error(STORE, path), + _ => Self::Generic { + store: STORE, + source: Box::new(err), + }, } } } +pub(crate) enum PutPartPayload<'a> { + Part(PutPayload), + Copy(&'a Path), +} + +impl Default for PutPartPayload<'_> { + fn default() -> Self { + Self::Part(PutPayload::default()) + } +} + +pub(crate) enum CompleteMultipartMode { + Overwrite, + Create, +} + #[derive(Deserialize)] #[serde(rename_all = "PascalCase", rename = "DeleteResult")] struct BatchDeleteResponse { @@ -605,15 +627,24 @@ impl S3Client { path: &Path, upload_id: &MultipartId, part_idx: usize, - data: PutPayload, + data: PutPartPayload<'_>, ) -> Result { + let is_copy = matches!(data, PutPartPayload::Copy(_)); let part = (part_idx + 1).to_string(); let mut request = self .request(Method::PUT, path) - .with_payload(data) .query(&[("partNumber", &part), ("uploadId", upload_id)]) .idempotent(true); + + request = match data { + PutPartPayload::Part(payload) => request.with_payload(payload), + PutPartPayload::Copy(path) => request.header( + "x-amz-copy-source", + &format!("{}/{}", self.config.bucket, encode_path(path)), + ), + }; + if self .config .encryption_headers @@ -625,21 +656,48 @@ impl S3Client { } let response = request.send().await?; - let content_id = get_etag(response.headers()).context(MetadataSnafu)?; + let content_id = match is_copy { + false => get_etag(response.headers()).context(MetadataSnafu)?, + true => { + let response = response + .bytes() + .await + .context(CreateMultipartResponseBodySnafu)?; + let response: CopyPartResult = quick_xml::de::from_reader(response.reader()) + .context(InvalidMultipartResponseSnafu)?; + response.e_tag + } + }; Ok(PartId { content_id }) } + pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> { + self.request(Method::DELETE, location) + .query(&[("uploadId", upload_id)]) + .with_encryption_headers() + .send() + .await?; + + Ok(()) + } + pub(crate) async fn complete_multipart( &self, location: &Path, upload_id: &str, parts: Vec, + mode: CompleteMultipartMode, ) -> Result { let parts = if parts.is_empty() { // If no parts were uploaded, upload an empty part // otherwise the completion request will fail let part = self - .put_part(location, &upload_id.to_string(), 0, PutPayload::default()) + .put_part( + location, + &upload_id.to_string(), + 0, + PutPartPayload::default(), + ) .await?; vec![part] } else { @@ -651,18 +709,27 @@ impl S3Client { let credential = self.config.get_session_credential().await?; let url = self.config.path_url(location); - let response = self + let request = self .client .request(Method::POST, url) .query(&[("uploadId", upload_id)]) .body(body) - .with_aws_sigv4(credential.authorizer(), None) + .with_aws_sigv4(credential.authorizer(), None); + + let request = match mode { + CompleteMultipartMode::Overwrite => request, + CompleteMultipartMode::Create => request.header("If-None-Match", "*"), + }; + + let response = request .retryable(&self.config.retry_config) .idempotent(true) .retry_error_body(true) .send() .await - .context(CompleteMultipartRequestSnafu)?; + .context(CompleteMultipartRequestSnafu { + path: location.as_ref(), + })?; let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?; diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index a27ed053317e..b238d90eb6d7 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -36,7 +36,7 @@ use reqwest::{Method, StatusCode}; use std::{sync::Arc, time::Duration}; use url::Url; -use crate::aws::client::{RequestError, S3Client}; +use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3Client}; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; @@ -169,7 +169,10 @@ impl ObjectStore for AmazonS3 { match (opts.mode, &self.client.config.conditional_put) { (PutMode::Overwrite, _) => request.idempotent(true).do_put().await, (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), - (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => { + ( + PutMode::Create, + Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagPutIfNotExists), + ) => { match request.header(&IF_NONE_MATCH, "*").do_put().await { // Technically If-None-Match should return NotModified but some stores, // such as R2, instead return PreconditionFailed @@ -193,6 +196,7 @@ impl ObjectStore for AmazonS3 { source: "ETag required for conditional put".to_string().into(), })?; match put { + S3ConditionalPut::ETagPutIfNotExists => Err(Error::NotImplemented), S3ConditionalPut::ETagMatch => { request.header(&IF_MATCH, etag.as_str()).do_put().await } @@ -293,6 +297,47 @@ impl ObjectStore for AmazonS3 { let (k, v, status) = match &self.client.config.copy_if_not_exists { Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED), Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status), + Some(S3CopyIfNotExists::Multipart) => { + let upload_id = self + .client + .create_multipart(to, PutMultipartOpts::default()) + .await?; + + let res = async { + let part_id = self + .client + .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) + .await?; + match self + .client + .complete_multipart( + to, + &upload_id, + vec![part_id], + CompleteMultipartMode::Create, + ) + .await + { + Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { + path: to.to_string(), + source: Box::new(e), + }), + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } + .await; + + // If the multipart upload failed, make a best effort attempt to + // clean it up. It's the caller's responsibility to add a + // lifecycle rule if guaranteed cleanup is required, as we + // cannot protect against an ill-timed process crash. + if res.is_err() { + let _ = self.client.abort_multipart(to, &upload_id).await; + } + + return res; + } Some(S3CopyIfNotExists::Dynamo(lock)) => { return lock.copy_if_not_exists(&self.client, from, to).await } @@ -340,7 +385,12 @@ impl MultipartUpload for S3MultiPartUpload { Box::pin(async move { let part = state .client - .put_part(&state.location, &state.upload_id, idx, data) + .put_part( + &state.location, + &state.upload_id, + idx, + PutPartPayload::Part(data), + ) .await?; state.parts.put(idx, part); Ok(()) @@ -352,7 +402,12 @@ impl MultipartUpload for S3MultiPartUpload { self.state .client - .complete_multipart(&self.state.location, &self.state.upload_id, parts) + .complete_multipart( + &self.state.location, + &self.state.upload_id, + parts, + CompleteMultipartMode::Overwrite, + ) .await } @@ -384,7 +439,9 @@ impl MultipartStore for AmazonS3 { part_idx: usize, data: PutPayload, ) -> Result { - self.client.put_part(path, id, part_idx, data).await + self.client + .put_part(path, id, part_idx, PutPartPayload::Part(data)) + .await } async fn complete_multipart( @@ -393,7 +450,9 @@ impl MultipartStore for AmazonS3 { id: &MultipartId, parts: Vec, ) -> Result { - self.client.complete_multipart(path, id, parts).await + self.client + .complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite) + .await } async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> { @@ -427,7 +486,6 @@ mod tests { let integration = config.build().unwrap(); let config = &integration.client.config; let test_not_exists = config.copy_if_not_exists.is_some(); - let test_conditional_put = config.conditional_put.is_some(); put_get_delete_list(&integration).await; get_opts(&integration).await; @@ -458,8 +516,9 @@ mod tests { if test_not_exists { copy_if_not_exists(&integration).await; } - if test_conditional_put { - put_opts(&integration, true).await; + if let Some(conditional_put) = &config.conditional_put { + let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagPutIfNotExists); + put_opts(&integration, supports_update).await; } // run integration test with unsigned payload enabled diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index ad9e21537939..e5058052790d 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -46,6 +46,21 @@ pub enum S3CopyIfNotExists { /// /// Encoded as `header-with-status:::` ignoring whitespace HeaderWithStatus(String, String, reqwest::StatusCode), + /// Native Amazon S3 supports copy if not exists through a multipart upload + /// where the upload copies an existing object and is completed only if the + /// new object does not already exist. + /// + /// WARNING: When using this mode, `copy_if_not_exists` does not copy tags + /// or attributes from the source object. + /// + /// WARNING: When using this mode, `copy_if_not_exists` makes only a best + /// effort attempt to clean up the multipart upload if the copy operation + /// fails. Consider using a lifecycle rule to automatically clean up + /// abandoned multipart uploads. See [the module + /// docs](super#multipart-uploads) for details. + /// + /// Encoded as `multipart` ignoring whitespace. + Multipart, /// The name of a DynamoDB table to use for coordination /// /// Encoded as either `dynamo:` or `dynamo::` @@ -64,6 +79,7 @@ impl std::fmt::Display for S3CopyIfNotExists { Self::HeaderWithStatus(k, v, code) => { write!(f, "header-with-status: {k}: {v}: {}", code.as_u16()) } + Self::Multipart => f.write_str("multipart"), Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } @@ -71,6 +87,10 @@ impl std::fmt::Display for S3CopyIfNotExists { impl S3CopyIfNotExists { fn from_str(s: &str) -> Option { + if s.trim() == "multipart" { + return Some(Self::Multipart); + }; + let (variant, value) = s.split_once(':')?; match variant.trim() { "header" => { @@ -118,6 +138,17 @@ pub enum S3ConditionalPut { /// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions ETagMatch, + /// Like `ETagMatch`, but with support for `PutMode::Create` and not + /// `PutMode::Option`. + /// + /// This is the limited form of conditional put supported by Amazon S3 + /// as of August 2024 ([announcement]). + /// + /// Encoded as `etag-put-if-not-exists` ignoring whitespace. + /// + /// [announcement]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/ + ETagPutIfNotExists, + /// The name of a DynamoDB table to use for coordination /// /// Encoded as either `dynamo:` or `dynamo::` @@ -133,6 +164,7 @@ impl std::fmt::Display for S3ConditionalPut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ETagMatch => write!(f, "etag"), + Self::ETagPutIfNotExists => write!(f, "etag-put-if-not-exists"), Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } @@ -142,6 +174,7 @@ impl S3ConditionalPut { fn from_str(s: &str) -> Option { match s.trim() { "etag" => Some(Self::ETagMatch), + "etag-put-if-not-exists" => Some(Self::ETagPutIfNotExists), trimmed => match trimmed.split_once(':')? { ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), _ => None, diff --git a/object_store/src/client/s3.rs b/object_store/src/client/s3.rs index a9c47260e6f4..dba752cb1251 100644 --- a/object_store/src/client/s3.rs +++ b/object_store/src/client/s3.rs @@ -92,6 +92,14 @@ pub(crate) struct InitiateMultipartUploadResult { pub upload_id: String, } +#[cfg(feature = "aws")] +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct CopyPartResult { + #[serde(rename = "ETag")] + pub e_tag: String, +} + #[derive(Debug, Serialize)] #[serde(rename_all = "PascalCase")] pub(crate) struct CompleteMultipartUpload { diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 89b21bc61696..30177878306f 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -651,6 +651,12 @@ pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { assert_eq!(b.as_ref(), b"a"); if !supports_update { + let err = storage + .put_opts(&path, "c".into(), PutMode::Update(v1.clone().into()).into()) + .await + .unwrap_err(); + assert!(matches!(err, Error::NotImplemented { .. }), "{err}"); + return; }