From e337c8a58cadc0a7bcf8f7de897dc94a2b63687c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 26 Dec 2023 19:06:30 +0000 Subject: [PATCH 1/5] Parse Dynamo CondititionalPut --- object_store/src/aws/dynamo.rs | 10 ++++++ object_store/src/aws/mod.rs | 12 ++++++-- object_store/src/aws/precondition.rs | 46 ++++++++++++++++++++++------ 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index ce1500bf4090..a9169a666384 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -154,6 +154,16 @@ impl DynamoCommit { self } + /// Parse [`DynamoCommit`] from a string + pub(crate) fn from_str(value: &str) -> Option { + Some(match value.split_once(':') { + Some((table_name, timeout)) => { + DynamoCommit::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?) + } + None => DynamoCommit::new(value.trim().to_string()), + }) + } + /// Returns the name of the DynamoDB table. pub(crate) fn table_name(&self) -> &str { &self.table_name diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 75b43d448bac..e242c21a0bf0 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -187,12 +187,20 @@ impl ObjectStore for AmazonS3 { r => r, } } - (PutMode::Update(v), Some(S3ConditionalPut::ETagMatch)) => { + (PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => todo!(), + (PutMode::Update(v), Some(put)) => { let etag = v.e_tag.ok_or_else(|| Error::Generic { store: STORE, source: "ETag required for conditional put".to_string().into(), })?; - request.header(&IF_MATCH, etag.as_str()).do_put().await + match put { + S3ConditionalPut::ETagMatch => { + request.header(&IF_MATCH, etag.as_str()).do_put().await + } + S3ConditionalPut::Dynamo(d) => { + todo!() + } + } } } } diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index 83d45db82c8e..e885ac5bb09e 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -48,7 +48,7 @@ pub enum S3CopyIfNotExists { HeaderWithStatus(String, String, reqwest::StatusCode), /// The name of a DynamoDB table to use for coordination /// - /// Encoded as either `dynamodb:` or `dynamodb::` + /// Encoded as either `dynamo:` or `dynamo::` /// ignoring whitespace. The default timeout is used if not specified /// /// See [`DynamoCommit`] for more information @@ -88,12 +88,7 @@ impl S3CopyIfNotExists { code, )) } - "dynamo" => Some(Self::Dynamo(match value.split_once(':') { - Some((table_name, timeout)) => DynamoCommit::new(table_name.trim().to_string()) - .with_timeout(timeout.parse().ok()?), - None => DynamoCommit::new(value.trim().to_string()), - })), - + "dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)), _ => None, } } @@ -111,7 +106,7 @@ impl Parse for S3CopyIfNotExists { /// Configure how to provide conditional put support for [`AmazonS3`]. /// /// [`AmazonS3`]: super::AmazonS3 -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] #[allow(missing_copy_implementations)] #[non_exhaustive] pub enum S3ConditionalPut { @@ -122,12 +117,23 @@ pub enum S3ConditionalPut { /// /// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions ETagMatch, + + /// The name of a DynamoDB table to use for coordination + /// + /// Encoded as either `dynamo:` or `dynamo::` + /// ignoring whitespace. The default timeout is used if not specified + /// + /// See [`DynamoCommit`] for more information + /// + /// This will use the same region, credentials and endpoint as configured for S3 + Dynamo(DynamoCommit), } impl std::fmt::Display for S3ConditionalPut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ETagMatch => write!(f, "etag"), + Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } } @@ -136,7 +142,10 @@ impl S3ConditionalPut { fn from_str(s: &str) -> Option { match s.trim() { "etag" => Some(Self::ETagMatch), - _ => None, + trimmed => match trimmed.split_once(":")? { + ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), + _ => None, + }, } } } @@ -153,6 +162,7 @@ impl Parse for S3ConditionalPut { #[cfg(test)] mod tests { use super::S3CopyIfNotExists; + use crate::aws::{DynamoCommit, S3ConditionalPut}; #[test] fn parse_s3_copy_if_not_exists_header() { @@ -177,6 +187,24 @@ mod tests { assert_eq!(expected, S3CopyIfNotExists::from_str(input)); } + #[test] + fn parse_s3_copy_if_not_exists_dynamo() { + let input = "dynamo: table:100"; + let expected = Some(S3CopyIfNotExists::Dynamo( + DynamoCommit::new("table".into()).with_timeout(100), + )); + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + + #[test] + fn parse_s3_condition_put_dynamo() { + let input = "dynamo: table:1300"; + let expected = Some(S3ConditionalPut::Dynamo( + DynamoCommit::new("table".into()).with_timeout(1300), + )); + assert_eq!(expected, S3ConditionalPut::from_str(input)); + } + #[test] fn parse_s3_copy_if_not_exists_header_whitespace_invariant() { let expected = Some(S3CopyIfNotExists::Header( From b069ed7115e87308d1229996edc9e4b82e8e8823 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 26 Dec 2023 19:54:08 +0000 Subject: [PATCH 2/5] Add etag sort key --- .github/workflows/object_store.yml | 2 +- object_store/src/aws/dynamo.rs | 66 ++++++++++++++++++++++-------- 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 313d158090ae..89597086fd71 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -137,7 +137,7 @@ jobs: docker run -d -p 4566:4566 localstack/localstack:3.0.1 docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2 aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket - aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=key,KeyType=HASH --attribute-definitions AttributeName=key,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 + aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 - name: Configure Azurite (Azure emulation) # the magical connection string is from diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index a9169a666384..929c6be4705d 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -61,16 +61,24 @@ const STORE: &str = "DynamoDB"; /// /// The DynamoDB schema is as follows: /// -/// * A string hash key named `"key"` +/// * A string partition key named `"path"` +/// * A string sort key named `"etag"` /// * A numeric [TTL] attribute named `"ttl"` /// * A numeric attribute named `"generation"` /// * A numeric attribute named `"timeout"` /// -/// To perform a conditional operation on an object with a given `path` and `etag` (if exists), +/// An appropriate DynamoDB table can be created with the CLI as follows: +/// +/// ```bash +/// $ aws dynamodb create-table --table-name --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S +/// $ aws dynamodb update-time-to-live --table-name --time-to-live-specification Enabled=true,AttributeName=ttl +/// ``` +/// +/// To perform a conditional operation on an object with a given `path` and `etag` (`*` if creating), /// the commit protocol is as follows: /// /// 1. Perform HEAD request on `path` and error on precondition mismatch -/// 2. Create record in DynamoDB with key `{path}#{etag}` with the configured timeout +/// 2. Create record in DynamoDB with given `path` and `etag` with the configured timeout /// 1. On Success: Perform operation with the configured timeout /// 2. On Conflict: /// 1. Periodically re-perform HEAD request on `path` and error on precondition mismatch @@ -181,7 +189,7 @@ impl DynamoCommit { loop { let existing = previous_lease.as_ref(); - match self.try_lock(client, to.as_ref(), existing).await? { + match self.try_lock(client, to.as_ref(), "*", existing).await? { TryLockResult::Ok(lease) => { let fut = client.copy_request(from, to).send(); let expiry = lease.acquire + lease.timeout; @@ -215,8 +223,11 @@ impl DynamoCommit { } /// Retrieve a lock, returning an error if it doesn't exist - async fn get_lock(&self, s3: &S3Client, key: &str) -> Result { - let key_attributes = [("key", AttributeValue::String(key))]; + async fn get_lock(&self, s3: &S3Client, path: &str, etag: &str) -> Result { + let key_attributes = [ + ("path", AttributeValue::String(path)), + ("etag", AttributeValue::String(etag)), + ]; let req = GetItem { table_name: &self.table_name, key: Map(&key_attributes), @@ -226,7 +237,7 @@ impl DynamoCommit { let resp = self .request(s3, credential.as_deref(), "DynamoDB_20120810.GetItem", req) .await - .map_err(|e| e.error(STORE, key.to_string()))?; + .map_err(|e| e.error(STORE, path.to_string()))?; let body = resp.bytes().await.map_err(|e| Error::Generic { store: STORE, @@ -240,7 +251,7 @@ impl DynamoCommit { })?; extract_lease(&response.item).ok_or_else(|| Error::NotFound { - path: key.into(), + path: path.into(), source: "DynamoDB GetItem returned no items".to_string().into(), }) } @@ -249,7 +260,8 @@ impl DynamoCommit { async fn try_lock( &self, s3: &S3Client, - key: &str, + path: &str, + etag: &str, existing: Option<&Lease>, ) -> Result { let attributes; @@ -267,12 +279,13 @@ impl DynamoCommit { let ttl = (Utc::now() + self.ttl).timestamp(); let items = [ - ("key", AttributeValue::String(key)), + ("path", AttributeValue::String(path)), + ("etag", AttributeValue::String(etag)), ("generation", AttributeValue::Number(next_gen)), ("timeout", AttributeValue::Number(self.timeout)), ("ttl", AttributeValue::Number(ttl as _)), ]; - let names = [("#pk", "key")]; + let names = [("#pk", "path")]; let req = PutItem { table_name: &self.table_name, @@ -312,7 +325,9 @@ impl DynamoCommit { // // // - None => Ok(TryLockResult::Conflict(self.get_lock(s3, key).await?)), + None => Ok(TryLockResult::Conflict( + self.get_lock(s3, path, etag).await?, + )), }, _ => Err(Error::Generic { store: STORE, @@ -528,10 +543,11 @@ pub(crate) use tests::integration_test; #[cfg(test)] mod tests { - use super::*; use crate::aws::AmazonS3; use crate::ObjectStore; + use rand::distributions::Alphanumeric; + use rand::{thread_rng, Rng}; #[test] fn test_attribute_serde() { @@ -554,24 +570,42 @@ mod tests { let _ = integration.delete(&dst).await; // Delete if present // Create a lock if not already exists - let existing = match d.try_lock(client, dst.as_ref(), None).await.unwrap() { + let existing = match d.try_lock(client, dst.as_ref(), "*", None).await.unwrap() { TryLockResult::Conflict(l) => l, TryLockResult::Ok(l) => l, }; // Should not be able to acquire a lock again - let r = d.try_lock(client, dst.as_ref(), None).await; + let r = d.try_lock(client, dst.as_ref(), "*", None).await; assert!(matches!(r, Ok(TryLockResult::Conflict(_)))); // But should still be able to reclaim lock and perform copy d.copy_if_not_exists(client, &src, &dst).await.unwrap(); - match d.try_lock(client, dst.as_ref(), None).await.unwrap() { + match d.try_lock(client, dst.as_ref(), "*", None).await.unwrap() { TryLockResult::Conflict(new) => { // Should have incremented generation to do so assert_eq!(new.generation, existing.generation + 1); } _ => panic!("Should conflict"), } + + let rng = thread_rng(); + let etag = String::from_utf8(rng.sample_iter(&Alphanumeric).take(32).collect()).unwrap(); + let l = match d.try_lock(client, dst.as_ref(), &etag, None).await.unwrap() { + TryLockResult::Ok(l) => l, + _ => panic!("should not conflict"), + }; + + match d.try_lock(client, dst.as_ref(), &etag, None).await.unwrap() { + TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation), + _ => panic!("should conflict"), + } + + let r = d.try_lock(client, dst.as_ref(), &etag, Some(&l)).await; + match r.unwrap() { + TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1), + _ => panic!("should not conflict"), + } } } From fd7348f8e62cec645f602001935563b12c8ed2ac Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 26 Dec 2023 21:13:43 +0000 Subject: [PATCH 3/5] Conditional Put --- .github/workflows/object_store.yml | 1 + object_store/src/aws/dynamo.rs | 104 ++++++++++++++++++++--------- object_store/src/aws/mod.rs | 10 ++- 3 files changed, 83 insertions(+), 32 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 89597086fd71..0257d86d9879 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -113,6 +113,7 @@ jobs: AWS_ENDPOINT: http://localhost:4566 AWS_ALLOW_HTTP: true AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000 + AWS_CONDITIONAL_PUT: dynamo:test-table:2000 HTTP_URL: "http://localhost:8080" GOOGLE_BUCKET: test-bucket GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index 929c6be4705d..12cc101333e5 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -17,7 +17,9 @@ //! A DynamoDB based lock system +use std::borrow::Cow; use std::collections::HashMap; +use std::future::Future; use std::time::{Duration, Instant}; use chrono::Utc; @@ -183,23 +185,40 @@ impl DynamoCommit { from: &Path, to: &Path, ) -> Result<()> { - check_not_exists(client, to).await?; + self.conditional_op(client, to, None, || async { + client.copy_request(from, to).send().await?; + Ok(()) + }) + .await + } + + pub(crate) async fn conditional_op( + &self, + client: &S3Client, + to: &Path, + etag: Option<&str>, + op: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: Future>, + { + check_precondition(client, to, etag).await?; let mut previous_lease = None; loop { let existing = previous_lease.as_ref(); - match self.try_lock(client, to.as_ref(), "*", existing).await? { + match self.try_lock(client, to.as_ref(), etag, existing).await? { TryLockResult::Ok(lease) => { - let fut = client.copy_request(from, to).send(); let expiry = lease.acquire + lease.timeout; - return match tokio::time::timeout_at(expiry.into(), fut).await { - Ok(Ok(_)) => Ok(()), + return match tokio::time::timeout_at(expiry.into(), op()).await { + Ok(Ok(v)) => Ok(v), Ok(Err(e)) => Err(e.into()), Err(_) => Err(Error::Generic { store: "DynamoDB", source: format!( - "Failed to perform copy operation in {} milliseconds", + "Failed to perform conditional operation in {} milliseconds", self.timeout ) .into(), @@ -211,7 +230,7 @@ impl DynamoCommit { let expiry = conflict.timeout * self.max_clock_skew_rate; loop { interval.tick().await; - check_not_exists(client, to).await?; + check_precondition(client, to, etag).await?; if conflict.acquire.elapsed() > expiry { previous_lease = Some(conflict); break; @@ -223,10 +242,10 @@ impl DynamoCommit { } /// Retrieve a lock, returning an error if it doesn't exist - async fn get_lock(&self, s3: &S3Client, path: &str, etag: &str) -> Result { + async fn get_lock(&self, s3: &S3Client, path: &str, etag: Option<&str>) -> Result { let key_attributes = [ - ("path", AttributeValue::String(path)), - ("etag", AttributeValue::String(etag)), + ("path", AttributeValue::from(path)), + ("etag", AttributeValue::from(etag.unwrap_or("*"))), ]; let req = GetItem { table_name: &self.table_name, @@ -261,7 +280,7 @@ impl DynamoCommit { &self, s3: &S3Client, path: &str, - etag: &str, + etag: Option<&str>, existing: Option<&Lease>, ) -> Result { let attributes; @@ -279,8 +298,8 @@ impl DynamoCommit { let ttl = (Utc::now() + self.ttl).timestamp(); let items = [ - ("path", AttributeValue::String(path)), - ("etag", AttributeValue::String(etag)), + ("path", AttributeValue::from(path)), + ("etag", AttributeValue::from(etag.unwrap_or("*"))), ("generation", AttributeValue::Number(next_gen)), ("timeout", AttributeValue::Number(self.timeout)), ("ttl", AttributeValue::Number(ttl as _)), @@ -372,19 +391,37 @@ enum TryLockResult { Conflict(Lease), } -/// Returns an [`Error::AlreadyExists`] if `path` exists -async fn check_not_exists(client: &S3Client, path: &Path) -> Result<()> { +/// Validates that `path` has the given `etag` or doesn't exist if `None` +async fn check_precondition(client: &S3Client, path: &Path, etag: Option<&str>) -> Result<()> { let options = GetOptions { head: true, ..Default::default() }; - match client.get_opts(path, options).await { - Ok(_) => Err(Error::AlreadyExists { - path: path.to_string(), - source: "Already Exists".to_string().into(), - }), - Err(Error::NotFound { .. }) => Ok(()), - Err(e) => Err(e), + + match etag { + Some(expected) => match client.get_opts(path, options).await { + Ok(r) => match r.meta.e_tag { + Some(actual) if expected == actual => Ok(()), + actual => Err(Error::Precondition { + path: path.to_string(), + source: format!("{} does not match {expected}", actual.unwrap_or_default()) + .into(), + }), + }, + Err(Error::NotFound { .. }) => Err(Error::Precondition { + path: path.to_string(), + source: format!("Object at location {path} not found").into(), + }), + Err(e) => Err(e), + }, + None => match client.get_opts(path, options).await { + Ok(_) => Err(Error::AlreadyExists { + path: path.to_string(), + source: "Already Exists".to_string().into(), + }), + Err(Error::NotFound { .. }) => Ok(()), + Err(e) => Err(e), + }, } } @@ -518,11 +555,17 @@ impl<'a, K: Serialize, V: Serialize> Serialize for Map<'a, K, V> { #[derive(Debug, Serialize, Deserialize)] enum AttributeValue<'a> { #[serde(rename = "S")] - String(&'a str), + String(Cow<'a, str>), #[serde(rename = "N", with = "number")] Number(u64), } +impl<'a> From<&'a str> for AttributeValue<'a> { + fn from(value: &'a str) -> Self { + Self::String(Cow::Borrowed(value)) + } +} + /// Numbers are serialized as strings mod number { use serde::{Deserialize, Deserializer, Serializer}; @@ -570,19 +613,19 @@ mod tests { let _ = integration.delete(&dst).await; // Delete if present // Create a lock if not already exists - let existing = match d.try_lock(client, dst.as_ref(), "*", None).await.unwrap() { + let existing = match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() { TryLockResult::Conflict(l) => l, TryLockResult::Ok(l) => l, }; // Should not be able to acquire a lock again - let r = d.try_lock(client, dst.as_ref(), "*", None).await; + let r = d.try_lock(client, dst.as_ref(), None, None).await; assert!(matches!(r, Ok(TryLockResult::Conflict(_)))); // But should still be able to reclaim lock and perform copy d.copy_if_not_exists(client, &src, &dst).await.unwrap(); - match d.try_lock(client, dst.as_ref(), "*", None).await.unwrap() { + match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() { TryLockResult::Conflict(new) => { // Should have incremented generation to do so assert_eq!(new.generation, existing.generation + 1); @@ -592,18 +635,19 @@ mod tests { let rng = thread_rng(); let etag = String::from_utf8(rng.sample_iter(&Alphanumeric).take(32).collect()).unwrap(); - let l = match d.try_lock(client, dst.as_ref(), &etag, None).await.unwrap() { + let t = Some(etag.as_str()); + + let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() { TryLockResult::Ok(l) => l, _ => panic!("should not conflict"), }; - match d.try_lock(client, dst.as_ref(), &etag, None).await.unwrap() { + match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() { TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation), _ => panic!("should conflict"), } - let r = d.try_lock(client, dst.as_ref(), &etag, Some(&l)).await; - match r.unwrap() { + match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() { TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1), _ => panic!("should not conflict"), } diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index e242c21a0bf0..77e6be37b665 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -187,7 +187,10 @@ impl ObjectStore for AmazonS3 { r => r, } } - (PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => todo!(), + (PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => { + d.conditional_op(&self.client, location, None, move || request.do_put()) + .await + } (PutMode::Update(v), Some(put)) => { let etag = v.e_tag.ok_or_else(|| Error::Generic { store: STORE, @@ -198,7 +201,10 @@ impl ObjectStore for AmazonS3 { request.header(&IF_MATCH, etag.as_str()).do_put().await } S3ConditionalPut::Dynamo(d) => { - todo!() + d.conditional_op(&self.client, location, Some(&etag), move || { + request.do_put() + }) + .await } } } From acea3c4d7fd789761f10736b92da7f391fbbe1e4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 26 Dec 2023 22:37:50 +0000 Subject: [PATCH 4/5] Speedup repeated test runs --- object_store/src/aws/dynamo.rs | 2 +- object_store/src/lib.rs | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index 12cc101333e5..c1902c7835cd 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -634,7 +634,7 @@ mod tests { } let rng = thread_rng(); - let etag = String::from_utf8(rng.sample_iter(&Alphanumeric).take(32).collect()).unwrap(); + let etag = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); let t = Some(etag.as_str()); let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() { diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 3a841667ff97..106f88fa76a6 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1233,6 +1233,7 @@ mod tests { use crate::test_util::flatten_list_stream; use chrono::TimeZone; use futures::stream::FuturesUnordered; + use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::future::Future; use tokio::io::AsyncWriteExt; @@ -1715,8 +1716,15 @@ mod tests { } pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { + // When using DynamoCommit repeated runs of this test will produce the same sequence of records in DynamoDB + // As a result each conditional operation will need to wait for the lease to timeout before proceeding + // One solution would be to clear DynamoDB before each test, but this would require non-trivial additional code + // so we instead just generate a random suffix for the filenames + let rng = thread_rng(); + let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + delete_fixtures(storage).await; - let path = Path::from("put_opts"); + let path = Path::from(format!("put_opts_{suffix}")); let v1 = storage .put_opts(&path, "a".into(), PutMode::Create.into()) .await @@ -1768,7 +1776,7 @@ mod tests { const NUM_WORKERS: usize = 5; const NUM_INCREMENTS: usize = 10; - let path = Path::from("RACE"); + let path = Path::from(format!("RACE-{suffix}")); let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS) .map(|_| async { for _ in 0..NUM_INCREMENTS { From aaa7687e0ce1b5d894cb0cc429a4629fdd140eb2 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 26 Dec 2023 23:04:50 +0000 Subject: [PATCH 5/5] Clippy --- object_store/src/aws/dynamo.rs | 7 ++++--- object_store/src/aws/precondition.rs | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index c1902c7835cd..f12a42137856 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -168,9 +168,9 @@ impl DynamoCommit { pub(crate) fn from_str(value: &str) -> Option { Some(match value.split_once(':') { Some((table_name, timeout)) => { - DynamoCommit::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?) + Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?) } - None => DynamoCommit::new(value.trim().to_string()), + None => Self::new(value.trim().to_string()), }) } @@ -192,6 +192,7 @@ impl DynamoCommit { .await } + #[allow(clippy::future_not_send)] // Generics confound this lint pub(crate) async fn conditional_op( &self, client: &S3Client, @@ -214,7 +215,7 @@ impl DynamoCommit { let expiry = lease.acquire + lease.timeout; return match tokio::time::timeout_at(expiry.into(), op()).await { Ok(Ok(v)) => Ok(v), - Ok(Err(e)) => Err(e.into()), + Ok(Err(e)) => Err(e), Err(_) => Err(Error::Generic { store: "DynamoDB", source: format!( diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index e885ac5bb09e..ad9e21537939 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -142,7 +142,7 @@ impl S3ConditionalPut { fn from_str(s: &str) -> Option { match s.trim() { "etag" => Some(Self::ETagMatch), - trimmed => match trimmed.split_once(":")? { + trimmed => match trimmed.split_once(':')? { ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), _ => None, },