Skip to content

Commit

Permalink
Add ObjectMeta::version and GetOptions::version (apache#4925) (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Oct 20, 2023
1 parent 0b9105d commit f4a2a88
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 9 deletions.
13 changes: 12 additions & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET};
use crate::client::get::GetClient;
use crate::client::header::get_etag;
use crate::client::header::HeaderConfig;
use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
use crate::client::retry::RetryExt;
Expand Down Expand Up @@ -549,6 +550,12 @@ impl S3Client {
impl GetClient for S3Client {
const STORE: &'static str = STORE;

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: Some("x-amz-version-id"),
};

/// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
let credential = self.get_credential().await?;
Expand All @@ -558,7 +565,11 @@ impl GetClient for S3Client {
false => Method::GET,
};

let builder = self.client.request(method, url);
let mut builder = self.client.request(method, url);

if let Some(v) = &options.version {
builder = builder.query(&[("versionId", v)])
}

let response = builder
.with_get_options(options)
Expand Down
14 changes: 13 additions & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::credential::AzureCredential;
use crate::azure::credential::*;
use crate::azure::{AzureCredentialProvider, STORE};
use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
Expand Down Expand Up @@ -254,6 +255,12 @@ impl AzureClient {
impl GetClient for AzureClient {
const STORE: &'static str = STORE;

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
version_header: Some("x-ms-version-id"),
};

/// Make an Azure GET request
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
Expand All @@ -265,12 +272,16 @@ impl GetClient for AzureClient {
false => Method::GET,
};

let builder = self
let mut builder = self
.client
.request(method, url)
.header(CONTENT_LENGTH, HeaderValue::from_static("0"))
.body(Bytes::new());

if let Some(v) = &options.version {
builder = builder.query(&[("versionid", v)])
}

let response = builder
.with_get_options(options)
.with_azure_authorization(&credential, &self.config.account)
Expand Down Expand Up @@ -427,6 +438,7 @@ impl TryFrom<Blob> for ObjectMeta {
last_modified: value.properties.last_modified,
size: value.properties.content_length as usize,
e_tag: value.properties.e_tag,
version: None, // For consistency with S3 and GCP which don't include this
})
}
}
Expand Down
5 changes: 1 addition & 4 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ pub trait GetClient: Send + Sync + 'static {
const STORE: &'static str;

/// Configure the [`HeaderConfig`] for this client
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
};
const HEADER_CONFIG: HeaderConfig;

async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response>;
}
Expand Down
13 changes: 11 additions & 2 deletions object_store/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub struct HeaderConfig {
///
/// Defaults to `true`
pub last_modified_required: bool,

/// The version header name if any
pub version_header: Option<&'static str>,
}

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -98,14 +101,20 @@ pub fn header_meta(
.context(MissingContentLengthSnafu)?;

let content_length = content_length.to_str().context(BadHeaderSnafu)?;
let content_length = content_length
let size = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let version = match cfg.version_header.and_then(|h| headers.get(h)) {
Some(v) => Some(v.to_str().context(BadHeaderSnafu)?.to_string()),
None => None,
};

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
version,
size,
e_tag,
})
}
1 change: 1 addition & 0 deletions object_store/src/client/list_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl TryFrom<ListContents> for ObjectMeta {
last_modified: value.last_modified,
size: value.size,
e_tag: value.e_tag,
version: None,
})
}
}
7 changes: 6 additions & 1 deletion object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::client::get::GetClient;
use crate::client::header::get_etag;
use crate::client::header::{get_etag, HeaderConfig};
use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
use crate::client::retry::RetryExt;
Expand Down Expand Up @@ -333,6 +333,11 @@ impl GoogleCloudStorageClient {
#[async_trait]
impl GetClient for GoogleCloudStorageClient {
const STORE: &'static str = STORE;
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
version_header: Some("x-goog-generation"),
};

/// Perform a get request <https://cloud.google.com/storage/docs/xml-api/get-object-download>
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
Expand Down
2 changes: 2 additions & 0 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ impl GetClient for Client {
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: None,
};

async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
Expand Down Expand Up @@ -375,6 +376,7 @@ impl MultiStatusResponse {
last_modified,
size: self.size()?,
e_tag: self.prop_stat.prop.e_tag.clone(),
version: None,
})
}

Expand Down
23 changes: 23 additions & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ pub struct ObjectMeta {
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
pub e_tag: Option<String>,
/// A version indicator for this object
pub version: Option<String>,
}

/// Options for a get request, such as range
Expand Down Expand Up @@ -685,6 +687,8 @@ pub struct GetOptions {
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
pub range: Option<Range<usize>>,
/// Request a particular object version
pub version: Option<String>,
/// Request transfer of no content
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-head>
Expand Down Expand Up @@ -1379,6 +1383,24 @@ mod tests {
};
let err = storage.get_opts(&path, options).await.unwrap_err();
assert!(matches!(err, Error::Precondition { .. }), "{err}");

if let Some(version) = meta.version {
storage.put(&path, "bar".into()).await.unwrap();

let options = GetOptions {
version: Some(version),
..GetOptions::default()
};

// Can retrieve previous version
let get_opts = storage.get_opts(&path, options).await.unwrap();
let old = get_opts.bytes().await.unwrap();
assert_eq!(old, b"foo".as_slice());

// Current version contains the updated data
let current = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(&current, b"bar".as_slice());
}
}

/// Returns a chunk of length `chunk_length`
Expand Down Expand Up @@ -1691,6 +1713,7 @@ mod tests {
last_modified: Utc.timestamp_nanos(100),
size: 100,
e_tag: Some("123".to_string()),
version: None,
};

let mut options = GetOptions::default();
Expand Down
1 change: 1 addition & 0 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
last_modified,
size,
e_tag: Some(get_etag(&metadata)),
version: None,
})
}

Expand Down
4 changes: 4 additions & 0 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl ObjectStore for InMemory {
last_modified: entry.last_modified,
size: entry.data.len(),
e_tag: Some(e_tag),
version: None,
};
options.check_preconditions(&meta)?;

Expand Down Expand Up @@ -212,6 +213,7 @@ impl ObjectStore for InMemory {
last_modified: entry.last_modified,
size: entry.data.len(),
e_tag: Some(entry.e_tag.to_string()),
version: None,
})
}

Expand Down Expand Up @@ -241,6 +243,7 @@ impl ObjectStore for InMemory {
last_modified: value.last_modified,
size: value.data.len(),
e_tag: Some(value.e_tag.to_string()),
version: None,
})
})
.collect();
Expand Down Expand Up @@ -285,6 +288,7 @@ impl ObjectStore for InMemory {
last_modified: v.last_modified,
size: v.data.len(),
e_tag: Some(v.e_tag.to_string()),
version: None,
};
objects.push(object);
}
Expand Down
1 change: 1 addition & 0 deletions object_store/src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl<T: ObjectStore> PrefixStore<T> {
size: meta.size,
location: self.strip_prefix(meta.location),
e_tag: meta.e_tag,
version: None,
}
}
}
Expand Down

0 comments on commit f4a2a88

Please sign in to comment.