Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ObjectMeta::version and GetOptions::version (#4925) #4935

Merged
merged 2 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::aws::{
AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET,
};
use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
use crate::client::retry::RetryExt;
Expand Down Expand Up @@ -553,6 +554,12 @@ impl S3Client {
impl GetClient for S3Client {
const STORE: &'static str = STORE;

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: Some("x-amz-version-id"),
};

/// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
async fn get_request(
&self,
Expand All @@ -567,7 +574,11 @@ impl GetClient for S3Client {
false => Method::GET,
};

let builder = self.client.request(method, url);
let mut builder = self.client.request(method, url);

if let Some(v) = &options.version {
builder = builder.query(&[("versionId", v)])
}

let response = builder
.with_get_options(options)
Expand Down
14 changes: 13 additions & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::credential::AzureCredential;
use crate::azure::credential::*;
use crate::azure::{AzureCredentialProvider, STORE};
use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
Expand Down Expand Up @@ -261,6 +262,12 @@ impl AzureClient {
impl GetClient for AzureClient {
const STORE: &'static str = STORE;

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
version_header: Some("x-ms-version-id"),
};

/// Make an Azure GET request
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
Expand All @@ -277,12 +284,16 @@ impl GetClient for AzureClient {
false => Method::GET,
};

let builder = self
let mut builder = self
.client
.request(method, url)
.header(CONTENT_LENGTH, HeaderValue::from_static("0"))
.body(Bytes::new());

if let Some(v) = &options.version {
builder = builder.query(&[("versionid", v)])
}

let response = builder
.with_get_options(options)
.with_azure_authorization(&credential, &self.config.account)
Expand Down Expand Up @@ -442,6 +453,7 @@ impl TryFrom<Blob> for ObjectMeta {
last_modified: value.properties.last_modified,
size: value.properties.content_length as usize,
e_tag: value.properties.e_tag,
version: None, // For consistency with S3 and GCP which don't include this
})
}
}
Expand Down
5 changes: 1 addition & 4 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ pub trait GetClient: Send + Sync + 'static {
const STORE: &'static str;

/// Configure the [`HeaderConfig`] for this client
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
};
const HEADER_CONFIG: HeaderConfig;

async fn get_request(
&self,
Expand Down
13 changes: 11 additions & 2 deletions object_store/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub struct HeaderConfig {
///
/// Defaults to `true`
pub last_modified_required: bool,

/// The version header name if any
pub version_header: Option<&'static str>,
}

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -95,14 +98,20 @@ pub fn header_meta(
.context(MissingContentLengthSnafu)?;

let content_length = content_length.to_str().context(BadHeaderSnafu)?;
let content_length = content_length
let size = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let version = match cfg.version_header.and_then(|h| headers.get(h)) {
Some(v) => Some(v.to_str().context(BadHeaderSnafu)?.to_string()),
None => None,
};

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
version,
size,
e_tag,
})
}
1 change: 1 addition & 0 deletions object_store/src/client/list_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl TryFrom<ListContents> for ObjectMeta {
last_modified: value.last_modified,
size: value.size,
e_tag: value.e_tag,
version: None,
})
}
}
27 changes: 19 additions & 8 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const STORE: &str = "GCS";

/// [`CredentialProvider`] for [`GoogleCloudStorage`]
pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = GcpCredential>>;
use crate::client::header::HeaderConfig;
pub use credential::GcpCredential;

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -388,6 +389,12 @@ impl GoogleCloudStorageClient {
impl GetClient for GoogleCloudStorageClient {
const STORE: &'static str = STORE;

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
version_header: Some("x-goog-generation"),
};

/// Perform a get request <https://cloud.google.com/storage/docs/xml-api/get-object-download>
async fn get_request(
&self,
Expand All @@ -403,19 +410,23 @@ impl GetClient for GoogleCloudStorageClient {
false => Method::GET,
};

let mut request = self.client.request(method, url).with_get_options(options);
let mut request = self.client.request(method, url);

if let Some(v) = &options.version {
request = request.query(&[("generation", v)]);
}

if !credential.bearer.is_empty() {
request = request.bearer_auth(&credential.bearer);
}

let response =
request
.send_retry(&self.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
let response = request
.with_get_options(options)
.send_retry(&self.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;

Ok(response)
}
Expand Down
2 changes: 2 additions & 0 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ impl GetClient for Client {
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: None,
};

async fn get_request(
Expand Down Expand Up @@ -389,6 +390,7 @@ impl MultiStatusResponse {
last_modified,
size: self.size()?,
e_tag: self.prop_stat.prop.e_tag.clone(),
version: None,
})
}

Expand Down
22 changes: 22 additions & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ pub struct ObjectMeta {
pub size: usize,
/// The unique identifier for the object
pub e_tag: Option<String>,
/// A version indicator for this object
pub version: Option<String>,
}

/// Options for a get request, such as range
Expand Down Expand Up @@ -714,6 +716,8 @@ pub struct GetOptions {
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
pub range: Option<Range<usize>>,
/// Request a particular object version
pub version: Option<String>,
}

impl GetOptions {
Expand Down Expand Up @@ -1374,6 +1378,24 @@ mod tests {
};
storage.get_opts(&path, options).await.unwrap();
}

if let Some(version) = meta.version {
storage.put(&path, "bar".into()).await.unwrap();

let options = GetOptions {
version: Some(version),
..GetOptions::default()
};

// Can retrieve previous version
let get_opts = storage.get_opts(&path, options).await.unwrap();
let old = get_opts.bytes().await.unwrap();
assert_eq!(old, b"foo".as_slice());

// Current version contains the updated data
let current = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(&current, b"bar".as_slice());
}
}

/// Returns a chunk of length `chunk_length`
Expand Down
1 change: 1 addition & 0 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
last_modified,
size,
e_tag: None,
version: None,
})
}

Expand Down
4 changes: 4 additions & 0 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl ObjectStore for InMemory {
last_modified,
size: data.len(),
e_tag: None,
version: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory the mem backend could implement this, not sure if that would be helpful for testing (might also be a follow-up PR).

BTW: what's the expectation regarding how long versions are kept in "the cloud"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this is waiting on #4922

As for expectations, none, we expose what is in the store, which in turn will depend on what lifecycle policy has been configured if any

};

let (range, data) = match options.range {
Expand Down Expand Up @@ -191,6 +192,7 @@ impl ObjectStore for InMemory {
last_modified: entry.1,
size: entry.0.len(),
e_tag: None,
version: None,
})
}

Expand Down Expand Up @@ -222,6 +224,7 @@ impl ObjectStore for InMemory {
last_modified: value.1,
size: value.0.len(),
e_tag: None,
version: None,
})
})
.collect();
Expand Down Expand Up @@ -266,6 +269,7 @@ impl ObjectStore for InMemory {
last_modified: v.1,
size: v.0.len(),
e_tag: None,
version: None,
};
objects.push(object);
}
Expand Down
1 change: 1 addition & 0 deletions object_store/src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl<T: ObjectStore> PrefixStore<T> {
size: meta.size,
location: self.strip_prefix(meta.location),
e_tag: meta.e_tag,
version: None,
}
}
}
Expand Down