From f4a2a88a658878db7d7b478880ebeecc4f27cb0f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 20 Oct 2023 14:41:10 +0100 Subject: [PATCH] Add ObjectMeta::version and GetOptions::version (#4925) (#4935) --- object_store/src/aws/client.rs | 13 ++++++++++++- object_store/src/azure/client.rs | 14 +++++++++++++- object_store/src/client/get.rs | 5 +---- object_store/src/client/header.rs | 13 +++++++++++-- object_store/src/client/list_response.rs | 1 + object_store/src/gcp/client.rs | 7 ++++++- object_store/src/http/client.rs | 2 ++ object_store/src/lib.rs | 23 +++++++++++++++++++++++ object_store/src/local.rs | 1 + object_store/src/memory.rs | 4 ++++ object_store/src/prefix.rs | 1 + 11 files changed, 75 insertions(+), 9 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 6b34b181ab9d..00d6ee446f2f 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -20,6 +20,7 @@ use crate::aws::credential::{AwsCredential, CredentialExt}; use crate::aws::{AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET}; use crate::client::get::GetClient; use crate::client::header::get_etag; +use crate::client::header::HeaderConfig; use crate::client::list::ListClient; use crate::client::list_response::ListResponse; use crate::client::retry::RetryExt; @@ -549,6 +550,12 @@ impl S3Client { impl GetClient for S3Client { const STORE: &'static str = STORE; + const HEADER_CONFIG: HeaderConfig = HeaderConfig { + etag_required: false, + last_modified_required: false, + version_header: Some("x-amz-version-id"), + }; + /// Make an S3 GET request async fn get_request(&self, path: &Path, options: GetOptions) -> Result { let credential = self.get_credential().await?; @@ -558,7 +565,11 @@ impl GetClient for S3Client { false => Method::GET, }; - let builder = self.client.request(method, url); + let mut builder = self.client.request(method, url); + + if let Some(v) = &options.version { + builder = builder.query(&[("versionId", v)]) + } let response = builder .with_get_options(options) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index b5ef02191cd7..cd3df8c7b857 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -19,6 +19,7 @@ use super::credential::AzureCredential; use crate::azure::credential::*; use crate::azure::{AzureCredentialProvider, STORE}; use crate::client::get::GetClient; +use crate::client::header::HeaderConfig; use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::GetOptionsExt; @@ -254,6 +255,12 @@ impl AzureClient { impl GetClient for AzureClient { const STORE: &'static str = STORE; + const HEADER_CONFIG: HeaderConfig = HeaderConfig { + etag_required: true, + last_modified_required: true, + version_header: Some("x-ms-version-id"), + }; + /// Make an Azure GET request /// /// @@ -265,12 +272,16 @@ impl GetClient for AzureClient { false => Method::GET, }; - let builder = self + let mut builder = self .client .request(method, url) .header(CONTENT_LENGTH, HeaderValue::from_static("0")) .body(Bytes::new()); + if let Some(v) = &options.version { + builder = builder.query(&[("versionid", v)]) + } + let response = builder .with_get_options(options) .with_azure_authorization(&credential, &self.config.account) @@ -427,6 +438,7 @@ impl TryFrom for ObjectMeta { last_modified: value.properties.last_modified, size: value.properties.content_length as usize, e_tag: value.properties.e_tag, + version: None, // For consistency with S3 and GCP which don't include this }) } } diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index ed1762ff8fe9..5f9cac9b424b 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -29,10 +29,7 @@ pub trait GetClient: Send + Sync + 'static { const STORE: &'static str; /// Configure the [`HeaderConfig`] for this client - const HEADER_CONFIG: HeaderConfig = HeaderConfig { - etag_required: true, - last_modified_required: true, - }; + const HEADER_CONFIG: HeaderConfig; async fn get_request(&self, path: &Path, options: GetOptions) -> Result; } diff --git a/object_store/src/client/header.rs b/object_store/src/client/header.rs index 17f83a2ba8c8..e67496833b99 100644 --- a/object_store/src/client/header.rs +++ b/object_store/src/client/header.rs @@ -35,6 +35,9 @@ pub struct HeaderConfig { /// /// Defaults to `true` pub last_modified_required: bool, + + /// The version header name if any + pub version_header: Option<&'static str>, } #[derive(Debug, Snafu)] @@ -98,14 +101,20 @@ pub fn header_meta( .context(MissingContentLengthSnafu)?; let content_length = content_length.to_str().context(BadHeaderSnafu)?; - let content_length = content_length + let size = content_length .parse() .context(InvalidContentLengthSnafu { content_length })?; + let version = match cfg.version_header.and_then(|h| headers.get(h)) { + Some(v) => Some(v.to_str().context(BadHeaderSnafu)?.to_string()), + None => None, + }; + Ok(ObjectMeta { location: location.clone(), last_modified, - size: content_length, + version, + size, e_tag, }) } diff --git a/object_store/src/client/list_response.rs b/object_store/src/client/list_response.rs index 6a3889e3be5b..7a170c584156 100644 --- a/object_store/src/client/list_response.rs +++ b/object_store/src/client/list_response.rs @@ -80,6 +80,7 @@ impl TryFrom for ObjectMeta { last_modified: value.last_modified, size: value.size, e_tag: value.e_tag, + version: None, }) } } diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 4165d784fd7f..558a6f8d2a84 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -16,7 +16,7 @@ // under the License. use crate::client::get::GetClient; -use crate::client::header::get_etag; +use crate::client::header::{get_etag, HeaderConfig}; use crate::client::list::ListClient; use crate::client::list_response::ListResponse; use crate::client::retry::RetryExt; @@ -333,6 +333,11 @@ impl GoogleCloudStorageClient { #[async_trait] impl GetClient for GoogleCloudStorageClient { const STORE: &'static str = STORE; + const HEADER_CONFIG: HeaderConfig = HeaderConfig { + etag_required: true, + last_modified_required: true, + version_header: Some("x-goog-generation"), + }; /// Perform a get request async fn get_request(&self, path: &Path, options: GetOptions) -> Result { diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index f7593be5a043..a7dbdfcbe844 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -277,6 +277,7 @@ impl GetClient for Client { const HEADER_CONFIG: HeaderConfig = HeaderConfig { etag_required: false, last_modified_required: false, + version_header: None, }; async fn get_request(&self, path: &Path, options: GetOptions) -> Result { @@ -375,6 +376,7 @@ impl MultiStatusResponse { last_modified, size: self.size()?, e_tag: self.prop_stat.prop.e_tag.clone(), + version: None, }) } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 375302e50d8b..656b30390a4d 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -637,6 +637,8 @@ pub struct ObjectMeta { /// /// pub e_tag: Option, + /// A version indicator for this object + pub version: Option, } /// Options for a get request, such as range @@ -685,6 +687,8 @@ pub struct GetOptions { /// /// pub range: Option>, + /// Request a particular object version + pub version: Option, /// Request transfer of no content /// /// @@ -1379,6 +1383,24 @@ mod tests { }; let err = storage.get_opts(&path, options).await.unwrap_err(); assert!(matches!(err, Error::Precondition { .. }), "{err}"); + + if let Some(version) = meta.version { + storage.put(&path, "bar".into()).await.unwrap(); + + let options = GetOptions { + version: Some(version), + ..GetOptions::default() + }; + + // Can retrieve previous version + let get_opts = storage.get_opts(&path, options).await.unwrap(); + let old = get_opts.bytes().await.unwrap(); + assert_eq!(old, b"foo".as_slice()); + + // Current version contains the updated data + let current = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(¤t, b"bar".as_slice()); + } } /// Returns a chunk of length `chunk_length` @@ -1691,6 +1713,7 @@ mod tests { last_modified: Utc.timestamp_nanos(100), size: 100, e_tag: Some("123".to_string()), + version: None, }; let mut options = GetOptions::default(); diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 9be3ee923244..ce9aa4683499 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -969,6 +969,7 @@ fn convert_metadata(metadata: Metadata, location: Path) -> Result { last_modified, size, e_tag: Some(get_etag(&metadata)), + version: None, }) } diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index da7b55d3a83f..8b9522e48de8 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -166,6 +166,7 @@ impl ObjectStore for InMemory { last_modified: entry.last_modified, size: entry.data.len(), e_tag: Some(e_tag), + version: None, }; options.check_preconditions(&meta)?; @@ -212,6 +213,7 @@ impl ObjectStore for InMemory { last_modified: entry.last_modified, size: entry.data.len(), e_tag: Some(entry.e_tag.to_string()), + version: None, }) } @@ -241,6 +243,7 @@ impl ObjectStore for InMemory { last_modified: value.last_modified, size: value.data.len(), e_tag: Some(value.e_tag.to_string()), + version: None, }) }) .collect(); @@ -285,6 +288,7 @@ impl ObjectStore for InMemory { last_modified: v.last_modified, size: v.data.len(), e_tag: Some(v.e_tag.to_string()), + version: None, }; objects.push(object); } diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index c4cb77b66d01..b5bff8b12dd7 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -73,6 +73,7 @@ impl PrefixStore { size: meta.size, location: self.strip_prefix(meta.location), e_tag: meta.e_tag, + version: None, } } }