Skip to content

Commit

Permalink
Add support for getting object checksums in GetObject (#1123)
Browse files Browse the repository at this point in the history
<!--
The title and description of pull requests will be used when creating a
squash commit to the base branch (usually `main`).
Please keep them both up-to-date as the code change evolves, to ensure
that the commit message is useful for future readers.
-->

## Description of change

- Adds new method `get_object_checksum` to `get_object` to retrieve the
whole object checksum.
- Adds new parameter `checksum_mode` to `GetObjectParams` to configure
if we want to request checksums from S3.
- If checksums aren't requested, `get_object_checksum` returns an error.
- Refactor `S3CrtClient` to store a cell of the object headers rather
than metadata.

<!--
    Please describe your contribution here.
    What is the change and why are you making it?
-->

Relevant issues: <!-- Please add issue numbers. -->

## Does this change impact existing behavior?

No

<!-- Please confirm there's no breaking change, or call our any behavior
changes you think are necessary. -->

## Does this change need a changelog entry in any of the crates?

Yes

<!--
    Please confirm yes or no.
    If no, add justification. If unsure, ask a reviewer.

    You can find the changelog for each crate here:
-
https://github.com/awslabs/mountpoint-s3/blob/main/mountpoint-s3/CHANGELOG.md
-
https://github.com/awslabs/mountpoint-s3/blob/main/mountpoint-s3-client/CHANGELOG.md
-
https://github.com/awslabs/mountpoint-s3/blob/main/mountpoint-s3-crt/CHANGELOG.md
-
https://github.com/awslabs/mountpoint-s3/blob/main/mountpoint-s3-crt-sys/CHANGELOG.md
-->

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Simon Beal <[email protected]>
  • Loading branch information
muddyfish authored Nov 11, 2024
1 parent ef01122 commit e544f8f
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 75 deletions.
3 changes: 3 additions & 0 deletions mountpoint-s3-client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
* Add parameter to request checksum information as part of a `HeadObject` request.
If specified, the result should contain the checksum for the object if available in the S3 response.
([#1083](https://github.com/awslabs/mountpoint-s3/pull/1083))
* Add parameter to request checksum information as part of a `GetObject` request.
If specified, calling `get_object_checksum` on `GetObjectRequest` will return the checksum information.
([#1123](https://github.com/awslabs/mountpoint-s3/pull/1123))
* Expose checksum algorithm in `ListObjectsResult`'s `ObjectInfo` struct.
([#1086](https://github.com/awslabs/mountpoint-s3/pull/1086),
[#1093](https://github.com/awslabs/mountpoint-s3/pull/1093))
Expand Down
6 changes: 5 additions & 1 deletion mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use pin_project::pin_project;

use crate::object_client::{
CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, GetBodyPart,
Checksum, CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, GetBodyPart,
GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectParams, GetObjectRequest,
HeadObjectError, HeadObjectParams, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute,
ObjectClient, ObjectClientError, ObjectClientResult, ObjectMetadata, PutObjectError, PutObjectParams,
Expand Down Expand Up @@ -223,6 +223,10 @@ impl<Client: ObjectClient + Send + Sync, FailState: Send + Sync> GetObjectReques
self.request.get_object_metadata().await
}

async fn get_object_checksum(&self) -> ObjectClientResult<Checksum, GetObjectError, Self::ClientError> {
self.request.get_object_checksum().await
}

fn increment_read_window(self: Pin<&mut Self>, len: usize) {
let this = self.project();
this.request.increment_read_window(len);
Expand Down
4 changes: 4 additions & 0 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ impl GetObjectRequest for MockGetObjectRequest {
Ok(self.object.object_metadata.clone())
}

async fn get_object_checksum(&self) -> ObjectClientResult<Checksum, GetObjectError, Self::ClientError> {
Ok(self.object.checksum.clone())
}

fn increment_read_window(mut self: Pin<&mut Self>, len: usize) {
self.read_window_end_offset += len as u64;
}
Expand Down
6 changes: 5 additions & 1 deletion mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::mock_client::{
MockClient, MockClientConfig, MockClientError, MockGetObjectRequest, MockObject, MockPutObjectRequest,
};
use crate::object_client::{
CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, GetBodyPart,
Checksum, CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, GetBodyPart,
GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectParams, GetObjectRequest,
HeadObjectError, HeadObjectParams, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute,
ObjectClient, ObjectClientResult, ObjectMetadata, PutObjectError, PutObjectParams, PutObjectResult,
Expand Down Expand Up @@ -74,6 +74,10 @@ impl GetObjectRequest for ThroughputGetObjectRequest {
Ok(self.request.object.object_metadata.clone())
}

async fn get_object_checksum(&self) -> ObjectClientResult<Checksum, GetObjectError, Self::ClientError> {
Ok(self.request.object.checksum.clone())
}

fn increment_read_window(self: Pin<&mut Self>, len: usize) {
let this = self.project();
this.request.increment_read_window(len);
Expand Down
12 changes: 11 additions & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ pub enum GetObjectError {
pub struct GetObjectParams {
pub range: Option<Range<u64>>,
pub if_match: Option<ETag>,
pub checksum_mode: Option<ChecksumMode>,
}

impl GetObjectParams {
Expand All @@ -208,6 +209,12 @@ impl GetObjectParams {
self.if_match = value;
self
}

/// Set option to retrieve checksum as part of the GetObject request
pub fn checksum_mode(mut self, value: Option<ChecksumMode>) -> Self {
self.checksum_mode = value;
self
}
}

/// Result of a [`list_objects`](ObjectClient::list_objects) request
Expand Down Expand Up @@ -256,7 +263,7 @@ impl HeadObjectParams {

/// Enable [ChecksumMode] to retrieve object checksums
#[non_exhaustive]
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub enum ChecksumMode {
/// Retrieve checksums
Enabled,
Expand Down Expand Up @@ -562,6 +569,9 @@ pub trait GetObjectRequest:
/// when they're read.
async fn get_object_metadata(&self) -> ObjectClientResult<ObjectMetadata, GetObjectError, Self::ClientError>;

/// Get the object's checksum, if uploaded with one
async fn get_object_checksum(&self) -> ObjectClientResult<Checksum, GetObjectError, Self::ClientError>;

/// Increment the flow-control window, so that response data continues downloading.
///
/// If the client was created with `enable_read_backpressure` set true,
Expand Down
35 changes: 34 additions & 1 deletion mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use mountpoint_s3_crt::auth::signing_config::SigningConfig;
use mountpoint_s3_crt::common::allocator::Allocator;
use mountpoint_s3_crt::common::string::AwsString;
use mountpoint_s3_crt::common::uri::Uri;
use mountpoint_s3_crt::http::request_response::{Header, Headers, Message};
use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError, Message};
use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
Expand Down Expand Up @@ -1119,6 +1119,21 @@ fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
Some(start..end + 1)
}

/// Extract the [Checksum] information from headers
fn parse_checksum(headers: &Headers) -> Result<Checksum, HeadersError> {
let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;

Ok(Checksum {
checksum_crc32,
checksum_crc32c,
checksum_sha1,
checksum_sha256,
})
}

/// Try to parse a modeled error out of a failing meta request
fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3RequestError> {
/// Look for a redirect header pointing to a different region for the bucket
Expand Down Expand Up @@ -1656,4 +1671,22 @@ mod tests {
};
assert_eq!(error, error_code.into());
}

#[test]
fn test_checksum_sha256() {
let mut headers = Headers::new(&Allocator::default()).unwrap();
let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
let header = Header::new("x-amz-checksum-sha256", value.to_owned());
headers.add_header(&header).unwrap();

let checksum = parse_checksum(&headers).expect("failed to parse headers");
assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
assert_eq!(
checksum.checksum_sha256,
Some(value.to_owned()),
"sha256 header should match"
);
}
}
97 changes: 62 additions & 35 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,26 @@ use std::task::{Context, Poll};
use futures::channel::mpsc::UnboundedReceiver;
use futures::Stream;
use mountpoint_s3_crt::common::error::Error;
use mountpoint_s3_crt::http::request_response::Header;
use mountpoint_s3_crt::http::request_response::{Header, Headers};
use mountpoint_s3_crt::s3::client::MetaRequestResult;
use pin_project::pin_project;
use thiserror::Error;

use crate::object_client::{
GetBodyPart, GetObjectError, GetObjectParams, ObjectClientError, ObjectClientResult, ObjectMetadata,
Checksum, GetBodyPart, GetObjectError, GetObjectParams, ObjectClientError, ObjectClientResult, ObjectMetadata,
};
use crate::s3_crt_client::{
GetObjectRequest, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Operation, S3RequestError,
parse_checksum, GetObjectRequest, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Operation, S3RequestError,
};
use crate::types::ChecksumMode;

/// Failures to return object metadata
#[derive(Clone, Error, Debug)]
pub enum ObjectMetadataError {
#[error("error occurred fetching object metadata")]
ObjectMetadataError,
pub enum ObjectHeadersError {
#[error("unknown error occurred receiving object headers")]
UnknownError,
#[error("requested object checksums, but did not specify it in the request")]
DidNotRequestChecksums,
}

impl S3CrtClient {
Expand All @@ -50,6 +53,14 @@ impl S3CrtClient {
.set_header(&Header::new("accept", "*/*"))
.map_err(S3RequestError::construction_failure)?;

let requested_checksums = params.checksum_mode.as_ref() == Some(&ChecksumMode::Enabled);
if requested_checksums {
// Add checksum header to receive object checksums.
message
.set_header(&Header::new("x-amz-checksum-mode", "enabled"))
.map_err(S3RequestError::construction_failure)?;
}

if let Some(etag) = params.if_match.as_ref() {
// Return the object only if its entity tag (ETag) is matched
message
Expand Down Expand Up @@ -79,43 +90,32 @@ impl S3CrtClient {
let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::GetObject);
options.part_size(self.inner.read_part_size as u64);

let object_metadata = AsyncCell::shared();
let object_headers = AsyncCell::shared();

let object_metadata_setter_on_headers = object_metadata.clone();
let object_metadata_setter_on_finish = object_metadata.clone();
let object_headers_setter_on_headers = object_headers.clone();
let object_headers_setter_on_finish = object_headers.clone();

let request = self.inner.make_meta_request_from_options(
options,
span,
|_| (),
move |headers, status| {
// Headers can be returned multiple times, but the object metadata doesn't change.
// Headers can be returned multiple times, but the metadata/checksums don't change.
// Explicitly ignore the case where we've already set object metadata.

// Only set metadata if we have a 2xx status code. If we only get other status
// codes, then on_finish cancels.
// Only set headers if we have a 2xx status code. If we only get other status codes,
// then on_finish sets an error.
if (200..300).contains(&status) {
// This isn't to do with safety, only minor performance gains.
if !object_metadata_setter_on_headers.is_set() {
let object_metadata = headers
.iter()
.filter_map(|(key, value)| {
let metadata_header = key.to_str()?.strip_prefix("x-amz-meta-")?;
let value = value.to_str()?;
Some((metadata_header.to_string(), value.to_string()))
})
.collect();
// Don't overwrite if already set.
object_metadata_setter_on_headers.or_set(Ok(object_metadata));
}
// Don't overwrite if already set - the first headers are fine.
object_headers_setter_on_headers.or_set(Ok(headers.clone()));
}
},
move |offset, data| {
let _ = sender.unbounded_send(Ok((offset, data.into())));
},
move |result| {
// FIXME - Ideally we'd include a reason why we failed here.
object_metadata_setter_on_finish.or_set(Err(ObjectMetadataError::ObjectMetadataError));
object_headers_setter_on_finish.or_set(Err(ObjectHeadersError::UnknownError));
if result.is_err() {
Err(parse_get_object_error(result).map(ObjectClientError::ServiceError))
} else {
Expand All @@ -128,8 +128,9 @@ impl S3CrtClient {
request,
finish_receiver: receiver,
finished: false,
requested_checksums,
enable_backpressure: self.inner.enable_backpressure,
object_metadata,
headers: object_headers,
initial_read_window_empty: self.inner.initial_read_window_size == 0,
next_offset,
read_window_end_offset,
Expand All @@ -150,8 +151,9 @@ pub struct S3GetObjectRequest {
#[pin]
finish_receiver: UnboundedReceiver<Result<GetBodyPart, Error>>,
finished: bool,
requested_checksums: bool,
enable_backpressure: bool,
object_metadata: Arc<AsyncCell<Result<ObjectMetadata, ObjectMetadataError>>>,
headers: Arc<AsyncCell<Result<Headers, ObjectHeadersError>>>,
initial_read_window_empty: bool,
/// Next offset of the data to be polled from [poll_next]
next_offset: u64,
Expand All @@ -160,22 +162,47 @@ pub struct S3GetObjectRequest {
read_window_end_offset: u64,
}

#[cfg_attr(not(docsrs), async_trait)]
impl GetObjectRequest for S3GetObjectRequest {
type ClientError = S3RequestError;

async fn get_object_metadata(&self) -> ObjectClientResult<ObjectMetadata, GetObjectError, Self::ClientError> {
match self.object_metadata.try_get() {
impl S3GetObjectRequest {
async fn get_object_headers(&self) -> ObjectClientResult<Headers, GetObjectError, S3RequestError> {
match self.headers.try_get() {
Some(result) => result,
None => {
if self.enable_backpressure && self.initial_read_window_empty {
return Err(ObjectClientError::ClientError(S3RequestError::EmptyReadWindow));
}
self.object_metadata.get().await
self.headers.get().await
}
}
.map_err(|_| ObjectClientError::ClientError(S3RequestError::RequestCanceled))
}
}

#[cfg_attr(not(docsrs), async_trait)]
impl GetObjectRequest for S3GetObjectRequest {
type ClientError = S3RequestError;

async fn get_object_metadata(&self) -> ObjectClientResult<ObjectMetadata, GetObjectError, Self::ClientError> {
let headers = self.get_object_headers().await?;
Ok(headers
.iter()
.filter_map(|(key, value)| {
let metadata_header = key.to_str()?.strip_prefix("x-amz-meta-")?;
let value = value.to_str()?;
Some((metadata_header.to_string(), value.to_string()))
})
.collect())
}

async fn get_object_checksum(&self) -> ObjectClientResult<Checksum, GetObjectError, Self::ClientError> {
if !self.requested_checksums {
return Err(ObjectClientError::ClientError(S3RequestError::InternalError(Box::new(
ObjectHeadersError::DidNotRequestChecksums,
))));
}

let headers = self.get_object_headers().await?;
parse_checksum(&headers).map_err(|e| ObjectClientError::ClientError(S3RequestError::InternalError(Box::new(e))))
}

fn increment_read_window(mut self: Pin<&mut Self>, len: usize) {
self.read_window_end_offset += len as u64;
Expand Down
38 changes: 3 additions & 35 deletions mountpoint-s3-client/src/s3_crt_client/head_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use time::OffsetDateTime;
use tracing::error;

use crate::object_client::{
Checksum, HeadObjectError, HeadObjectParams, HeadObjectResult, ObjectClientError, ObjectClientResult, RestoreStatus,
HeadObjectError, HeadObjectParams, HeadObjectResult, ObjectClientError, ObjectClientResult, RestoreStatus,
};
use crate::s3_crt_client::{S3CrtClient, S3Operation, S3RequestError};
use crate::s3_crt_client::{parse_checksum, S3CrtClient, S3Operation, S3RequestError};

use super::ChecksumMode;

Expand Down Expand Up @@ -65,20 +65,6 @@ impl HeadObjectResult {
Ok(Some(RestoreStatus::Restored { expiry: expiry.into() }))
}

fn parse_checksum(headers: &Headers) -> Result<Checksum, ParseError> {
let checksum_crc32 = headers.get_as_optional_string("x-amz-checksum-crc32")?;
let checksum_crc32c = headers.get_as_optional_string("x-amz-checksum-crc32c")?;
let checksum_sha1 = headers.get_as_optional_string("x-amz-checksum-sha1")?;
let checksum_sha256 = headers.get_as_optional_string("x-amz-checksum-sha256")?;

Ok(Checksum {
checksum_crc32,
checksum_crc32c,
checksum_sha1,
checksum_sha256,
})
}

/// Parse from HeadObject headers
fn parse_from_hdr(headers: &Headers) -> Result<Self, ParseError> {
let last_modified = OffsetDateTime::parse(&headers.get_as_string("Last-Modified")?, &Rfc2822)
Expand All @@ -88,7 +74,7 @@ impl HeadObjectResult {
let etag = headers.get_as_string("Etag")?;
let storage_class = headers.get_as_optional_string("x-amz-storage-class")?;
let restore_status = Self::parse_restore_status(headers)?;
let checksum = Self::parse_checksum(headers)?;
let checksum = parse_checksum(headers)?;
let result = HeadObjectResult {
size,
last_modified,
Expand Down Expand Up @@ -236,24 +222,6 @@ mod tests {
};
}

#[test]
fn test_checksum_sha256() {
let mut headers = Headers::new(&Allocator::default()).unwrap();
let value = "QwzjTQIHJO11oZbfwq1nx3dy0Wk=";
let header = Header::new("x-amz-checksum-sha256", value.to_owned());
headers.add_header(&header).unwrap();

let checksum = HeadObjectResult::parse_checksum(&headers).expect("failed to parse headers");
assert_eq!(checksum.checksum_crc32, None, "other checksums shouldn't be set");
assert_eq!(checksum.checksum_crc32c, None, "other checksums shouldn't be set");
assert_eq!(checksum.checksum_sha1, None, "other checksums shouldn't be set");
assert_eq!(
checksum.checksum_sha256,
Some(value.to_owned()),
"sha256 header should match"
);
}

#[test]
fn test_parse_restore_empty() {
let headers = Headers::new(&Allocator::default()).unwrap();
Expand Down
Loading

0 comments on commit e544f8f

Please sign in to comment.