From db513d5f5cd7ccb5060a11961b4f3c16de963333 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 13 Oct 2023 09:01:26 +0100 Subject: [PATCH] Add ObjectMeta::version and GetOptions::version (#4925) --- object_store/src/aws/client.rs | 13 +++++++++++- object_store/src/azure/client.rs | 14 +++++++++++- object_store/src/client/get.rs | 5 +---- object_store/src/client/header.rs | 13 ++++++++++-- object_store/src/client/list_response.rs | 1 + object_store/src/gcp/mod.rs | 27 +++++++++++++++++------- object_store/src/http/client.rs | 2 ++ object_store/src/lib.rs | 22 +++++++++++++++++++ object_store/src/local.rs | 1 + object_store/src/memory.rs | 4 ++++ object_store/src/prefix.rs | 1 + 11 files changed, 87 insertions(+), 16 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 1c35586f8bc9..aac4be7abb3b 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -21,6 +21,7 @@ use crate::aws::{ AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET, }; use crate::client::get::GetClient; +use crate::client::header::HeaderConfig; use crate::client::list::ListClient; use crate::client::list_response::ListResponse; use crate::client::retry::RetryExt; @@ -553,6 +554,12 @@ impl S3Client { impl GetClient for S3Client { const STORE: &'static str = STORE; + const HEADER_CONFIG: HeaderConfig = HeaderConfig { + etag_required: false, + last_modified_required: false, + version_header: Some("x-amz-version-id"), + }; + /// Make an S3 GET request async fn get_request( &self, @@ -567,7 +574,11 @@ impl GetClient for S3Client { false => Method::GET, }; - let builder = self.client.request(method, url); + let mut builder = self.client.request(method, url); + + if let Some(v) = &options.version { + builder = builder.query(&[("versionId", v)]) + } let response = builder .with_get_options(options) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index cd1a3a10fcc7..ceb95bcd6689 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -19,6 +19,7 @@ use super::credential::AzureCredential; use crate::azure::credential::*; use crate::azure::{AzureCredentialProvider, STORE}; use crate::client::get::GetClient; +use crate::client::header::HeaderConfig; use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::GetOptionsExt; @@ -261,6 +262,12 @@ impl AzureClient { impl GetClient for AzureClient { const STORE: &'static str = STORE; + const HEADER_CONFIG: HeaderConfig = HeaderConfig { + etag_required: true, + last_modified_required: true, + version_header: Some("x-ms-version-id"), + }; + /// Make an Azure GET request /// /// @@ -277,12 +284,16 @@ impl GetClient for AzureClient { false => Method::GET, }; - let builder = self + let mut builder = self .client .request(method, url) .header(CONTENT_LENGTH, HeaderValue::from_static("0")) .body(Bytes::new()); + if let Some(v) = &options.version { + builder = builder.query(&[("versionid", v)]) + } + let response = builder .with_get_options(options) .with_azure_authorization(&credential, &self.config.account) @@ -442,6 +453,7 @@ impl TryFrom for ObjectMeta { last_modified: value.properties.last_modified, size: value.properties.content_length as usize, e_tag: value.properties.e_tag, + version: None, // For consistency with S3 and GCP which don't include this }) } } diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index 333f6fe58475..1851224848a4 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -29,10 +29,7 @@ pub trait GetClient: Send + Sync + 'static { const STORE: &'static str; /// Configure the [`HeaderConfig`] for this client - const HEADER_CONFIG: HeaderConfig = HeaderConfig { - etag_required: true, - last_modified_required: true, - }; + const HEADER_CONFIG: HeaderConfig; async fn get_request( &self, diff --git a/object_store/src/client/header.rs b/object_store/src/client/header.rs index 6499eff5aebe..0e66ff568747 100644 --- a/object_store/src/client/header.rs +++ b/object_store/src/client/header.rs @@ -35,6 +35,9 @@ pub struct HeaderConfig { /// /// Defaults to `true` pub last_modified_required: bool, + + /// The version header name if any + pub version_header: Option<&'static str>, } #[derive(Debug, Snafu)] @@ -95,14 +98,20 @@ pub fn header_meta( .context(MissingContentLengthSnafu)?; let content_length = content_length.to_str().context(BadHeaderSnafu)?; - let content_length = content_length + let size = content_length .parse() .context(InvalidContentLengthSnafu { content_length })?; + let version = match cfg.version_header.and_then(|h| headers.get(h)) { + Some(v) => Some(v.to_str().context(BadHeaderSnafu)?.to_string()), + None => None, + }; + Ok(ObjectMeta { location: location.clone(), last_modified, - size: content_length, + version, + size, e_tag, }) } diff --git a/object_store/src/client/list_response.rs b/object_store/src/client/list_response.rs index 6a3889e3be5b..7a170c584156 100644 --- a/object_store/src/client/list_response.rs +++ b/object_store/src/client/list_response.rs @@ -80,6 +80,7 @@ impl TryFrom for ObjectMeta { last_modified: value.last_modified, size: value.size, e_tag: value.e_tag, + version: None, }) } } diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 3f5bf629d180..ce459235fd48 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -68,6 +68,7 @@ const STORE: &str = "GCS"; /// [`CredentialProvider`] for [`GoogleCloudStorage`] pub type GcpCredentialProvider = Arc>; +use crate::client::header::HeaderConfig; pub use credential::GcpCredential; #[derive(Debug, Snafu)] @@ -388,6 +389,12 @@ impl GoogleCloudStorageClient { impl GetClient for GoogleCloudStorageClient { const STORE: &'static str = STORE; + const HEADER_CONFIG: HeaderConfig = HeaderConfig { + etag_required: true, + last_modified_required: true, + version_header: Some("x-goog-generation"), + }; + /// Perform a get request async fn get_request( &self, @@ -403,19 +410,23 @@ impl GetClient for GoogleCloudStorageClient { false => Method::GET, }; - let mut request = self.client.request(method, url).with_get_options(options); + let mut request = self.client.request(method, url); + + if let Some(v) = &options.version { + request = request.query(&[("generation", v)]); + } if !credential.bearer.is_empty() { request = request.bearer_auth(&credential.bearer); } - let response = - request - .send_retry(&self.retry_config) - .await - .context(GetRequestSnafu { - path: path.as_ref(), - })?; + let response = request + .with_get_options(options) + .send_retry(&self.retry_config) + .await + .context(GetRequestSnafu { + path: path.as_ref(), + })?; Ok(response) } diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 0bd2e5639cb5..21aaaca43c28 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -286,6 +286,7 @@ impl GetClient for Client { const HEADER_CONFIG: HeaderConfig = HeaderConfig { etag_required: false, last_modified_required: false, + version_header: None, }; async fn get_request( @@ -389,6 +390,7 @@ impl MultiStatusResponse { last_modified, size: self.size()?, e_tag: self.prop_stat.prop.e_tag.clone(), + version: None, }) } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 3fd363fd4f06..b7fcf8a72b3c 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -682,6 +682,8 @@ pub struct ObjectMeta { pub size: usize, /// The unique identifier for the object pub e_tag: Option, + /// A version indicator for this object + pub version: Option, } /// Options for a get request, such as range @@ -714,6 +716,8 @@ pub struct GetOptions { /// /// pub range: Option>, + /// Request a particular object version + pub version: Option, } impl GetOptions { @@ -1374,6 +1378,24 @@ mod tests { }; storage.get_opts(&path, options).await.unwrap(); } + + if let Some(version) = meta.version { + storage.put(&path, "bar".into()).await.unwrap(); + + let options = GetOptions { + version: Some(version), + ..GetOptions::default() + }; + + // Can retrieve previous version + let get_opts = storage.get_opts(&path, options).await.unwrap(); + let old = get_opts.bytes().await.unwrap(); + assert_eq!(old, b"foo".as_slice()); + + // Current version contains the updated data + let current = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(¤t, b"bar".as_slice()); + } } /// Returns a chunk of length `chunk_length` diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 69da170b0872..7fdb15ae56f0 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -1012,6 +1012,7 @@ fn convert_metadata(metadata: Metadata, location: Path) -> Result { last_modified, size, e_tag: None, + version: None, }) } diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 0e229885b006..738050f5fba5 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -145,6 +145,7 @@ impl ObjectStore for InMemory { last_modified, size: data.len(), e_tag: None, + version: None, }; let (range, data) = match options.range { @@ -191,6 +192,7 @@ impl ObjectStore for InMemory { last_modified: entry.1, size: entry.0.len(), e_tag: None, + version: None, }) } @@ -222,6 +224,7 @@ impl ObjectStore for InMemory { last_modified: value.1, size: value.0.len(), e_tag: None, + version: None, }) }) .collect(); @@ -266,6 +269,7 @@ impl ObjectStore for InMemory { last_modified: v.1, size: v.0.len(), e_tag: None, + version: None, }; objects.push(object); } diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 39585f73b692..63fdaa46e607 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -73,6 +73,7 @@ impl PrefixStore { size: meta.size, location: self.strip_prefix(meta.location), e_tag: meta.e_tag, + version: None, } } }