Skip to content

Commit

Permalink
Small fixes for S3 on Outposts (#470)
Browse files Browse the repository at this point in the history
This fixes two issues that were preventing Mountpoint from working
against Outposts buckets:
1. Outposts doesn't include the bucket name in ListObjectsV2 responses.
   We weren't actually using that output anyway, so I just removed it.
2. For GetObject requests, we were sending a HTTP header like
   `Accept: application/xml,*/*`. While technically valid HTTP, it's
   weird to accept */* as well as something else, and it was confusing
   Outposts' request signing. So I switched to overwriting the existing
   header, which is what the comment suggested the code was intended to
   do anyway.

I also took this chance to make a little cleanup to parsing
ListObjectsV2 responses: the `parse` functions shouldn't be defined on
the generic `ListObjectsResult` structs, which are shared by all
clients.

Signed-off-by: James Bornholt <[email protected]>
  • Loading branch information
jamesbornholt authored Aug 30, 2023
1 parent 09f556e commit 73a27c1
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 72 deletions.
4 changes: 4 additions & 0 deletions mountpoint-s3-client/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Unreleased

* Breaking change: `ListObjectsResult` no longer includes the `bucket` field.

# v0.3.0 (June 20, 2023)

Breaking changes:
Expand Down
1 change: 0 additions & 1 deletion mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ impl ObjectClient for MockClient {
let common_prefixes = common_prefixes.into_iter().collect::<Vec<_>>();

Ok(ListObjectsResult {
bucket: bucket.to_string(),
objects: object_vec,
common_prefixes,
next_continuation_token,
Expand Down
3 changes: 0 additions & 3 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,6 @@ pub enum GetObjectError {
#[derive(Debug)]
#[non_exhaustive]
pub struct ListObjectsResult {
/// The name of the bucket.
pub bucket: String,

/// The list of objects.
pub objects: Vec<ObjectInfo>,

Expand Down
7 changes: 4 additions & 3 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,13 @@ struct S3Message {
}

impl S3Message {
/// Add a header to this message.
fn add_header(
/// Add a header to this message. The header is added if necessary and any existing values for
/// this header are removed.
fn set_header(
&mut self,
header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>,
) -> Result<(), mountpoint_s3_crt::common::error::Error> {
self.inner.add_header(header)
self.inner.set_header(header)
}

/// Set the request path and query for this message. The components should not be URL-encoded;
Expand Down
6 changes: 3 additions & 3 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ impl S3CrtClient {

// Overwrite "accept" header since this returns raw object data.
message
.add_header(&Header::new("accept", "*/*"))
.set_header(&Header::new("accept", "*/*"))
.map_err(S3RequestError::construction_failure)?;

if let Some(etag) = if_match {
// Return the object only if its entity tag (ETag) is matched
message
.add_header(&Header::new("If-Match", etag.as_str()))
.set_header(&Header::new("If-Match", etag.as_str()))
.map_err(S3RequestError::construction_failure)?;
}

Expand All @@ -55,7 +55,7 @@ impl S3CrtClient {
// Range HTTP header is bounded below *inclusive*
let range_value = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
message
.add_header(&Header::new("Range", range_value))
.set_header(&Header::new("Range", range_value))
.map_err(S3RequestError::construction_failure)?;

let length = range.end.saturating_sub(range.start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,20 @@ impl S3CrtClient {
if let Some(max_parts) = max_parts {
let value = format!("{}", max_parts);
message
.add_header(&Header::new("x-amz-max-parts", value))
.set_header(&Header::new("x-amz-max-parts", value))
.map_err(S3RequestError::construction_failure)?;
}

if let Some(part_number_marker) = part_number_marker {
let value = format!("{}", part_number_marker);
message
.add_header(&Header::new("x-amz-part-number-marker", value))
.set_header(&Header::new("x-amz-part-number-marker", value))
.map_err(S3RequestError::construction_failure)?;
}

let object_attributes: Vec<String> = object_attributes.iter().map(|attr| attr.to_string()).collect();
message
.add_header(&Header::new("x-amz-object-attributes", object_attributes.join(",")))
.set_header(&Header::new("x-amz-object-attributes", object_attributes.join(",")))
.map_err(S3RequestError::construction_failure)?;

let span = request_span!(
Expand Down
105 changes: 49 additions & 56 deletions mountpoint-s3-client/src/s3_crt_client/list_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,78 +54,71 @@ fn get_field(element: &xmltree::Element, name: &str) -> Result<String, ParseErro
get_text(get_child(element, name)?)
}

impl ListObjectsResult {
fn parse_from_bytes(bytes: &[u8]) -> Result<Self, ParseError> {
Self::parse_from_xml(&mut xmltree::Element::parse(bytes)?)
}

fn parse_from_xml(element: &mut xmltree::Element) -> Result<Self, ParseError> {
let mut objects = Vec::new();

while let Some(content) = element.take_child("Contents") {
objects.push(ObjectInfo::parse_from_xml(&content)?);
}
fn parse_result_from_bytes(bytes: &[u8]) -> Result<ListObjectsResult, ParseError> {
parse_result_from_xml(&mut xmltree::Element::parse(bytes)?)
}

let mut common_prefixes = Vec::new();
fn parse_result_from_xml(element: &mut xmltree::Element) -> Result<ListObjectsResult, ParseError> {
let mut objects = Vec::new();

while let Some(common_prefix) = element.take_child("CommonPrefixes") {
let prefix = get_field(&common_prefix, "Prefix")?;
common_prefixes.push(prefix);
}
while let Some(content) = element.take_child("Contents") {
objects.push(parse_object_info_from_xml(&content)?);
}

let bucket = get_field(element, "Name")?;
let mut common_prefixes = Vec::new();

let mut next_continuation_token = None;
if let Some(elem) = element.get_child("NextContinuationToken") {
next_continuation_token = Some(get_text(elem)?);
}
while let Some(common_prefix) = element.take_child("CommonPrefixes") {
let prefix = get_field(&common_prefix, "Prefix")?;
common_prefixes.push(prefix);
}

let is_truncated = get_field(element, "IsTruncated")?;
let is_truncated = bool::from_str(&is_truncated).map_err(|e| ParseError::Bool(e, "IsTruncated".to_string()))?;
let mut next_continuation_token = None;
if let Some(elem) = element.get_child("NextContinuationToken") {
next_continuation_token = Some(get_text(elem)?);
}

if is_truncated != next_continuation_token.is_some() {
return Err(ParseError::InvalidResponse(
element.clone(),
"IsTruncated doesn't match NextContinuationToken".to_string(),
));
}
let is_truncated = get_field(element, "IsTruncated")?;
let is_truncated = bool::from_str(&is_truncated).map_err(|e| ParseError::Bool(e, "IsTruncated".to_string()))?;

Ok(Self {
bucket,
objects,
common_prefixes,
next_continuation_token,
})
if is_truncated != next_continuation_token.is_some() {
return Err(ParseError::InvalidResponse(
element.clone(),
"IsTruncated doesn't match NextContinuationToken".to_string(),
));
}

Ok(ListObjectsResult {
objects,
common_prefixes,
next_continuation_token,
})
}

impl ObjectInfo {
fn parse_from_xml(element: &xmltree::Element) -> Result<Self, ParseError> {
let key = get_field(element, "Key")?;
fn parse_object_info_from_xml(element: &xmltree::Element) -> Result<ObjectInfo, ParseError> {
let key = get_field(element, "Key")?;

let size = get_field(element, "Size")?;
let size = get_field(element, "Size")?;

let size = u64::from_str(&size).map_err(|e| ParseError::Int(e, "Size".to_string()))?;
let size = u64::from_str(&size).map_err(|e| ParseError::Int(e, "Size".to_string()))?;

let last_modified = get_field(element, "LastModified")?;
let last_modified = get_field(element, "LastModified")?;

// S3 appears to use RFC 3339 to encode this field, based on the API example here:
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
let last_modified = OffsetDateTime::parse(&last_modified, &Rfc3339)
.map_err(|e| ParseError::OffsetDateTime(e, "LastModified".to_string()))?;
// S3 appears to use RFC 3339 to encode this field, based on the API example here:
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
let last_modified = OffsetDateTime::parse(&last_modified, &Rfc3339)
.map_err(|e| ParseError::OffsetDateTime(e, "LastModified".to_string()))?;

let storage_class = get_field(element, "StorageClass").ok();
let storage_class = get_field(element, "StorageClass").ok();

let etag = get_field(element, "ETag")?;
let etag = get_field(element, "ETag")?;

Ok(Self {
key,
size,
last_modified,
storage_class,
etag,
})
}
Ok(ObjectInfo {
key,
size,
last_modified,
storage_class,
etag,
})
}

impl S3CrtClient {
Expand Down Expand Up @@ -175,7 +168,7 @@ impl S3CrtClient {

let body = body.await?;

ListObjectsResult::parse_from_bytes(&body)
parse_result_from_bytes(&body)
.map_err(|e| ObjectClientError::ClientError(S3RequestError::InternalError(e.into())))
}
}
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl S3CrtClient {

if let Some(storage_class) = params.storage_class.to_owned() {
message
.add_header(&Header::new("x-amz-storage-class", storage_class))
.set_header(&Header::new("x-amz-storage-class", storage_class))
.map_err(S3RequestError::construction_failure)?;
}

Expand Down
1 change: 0 additions & 1 deletion mountpoint-s3-client/tests/list_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ async fn test_list_objects() {
.expect("ListObjects failed");

println!("{result:?}");
assert_eq!(result.bucket, bucket);
assert!(result.next_continuation_token.is_none());
assert_eq!(result.objects.len(), 1);
assert_eq!(result.objects[0].key, format!("{}{}", prefix, "hello"));
Expand Down
21 changes: 20 additions & 1 deletion mountpoint-s3-crt/src/http/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,31 @@ impl Message {
})
}

/// Add a header to this message.
/// Add a header to this message. If the header already exists in the message, this will add a
/// another header instead of overwriting the existing one. Use [set_header] to overwrite
/// potentially existing headers.
pub fn add_header(&mut self, header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>) -> Result<(), Error> {
// SAFETY: `aws_http_message_add_header` makes a copy of the values in `header`.
unsafe { aws_http_message_add_header(self.inner.as_ptr(), header.inner).ok_or_last_error() }
}

/// Set a header in this message. The header is added if necessary and any existing values for
/// this name are removed.
pub fn set_header(&mut self, header: &Header<impl AsRef<OsStr>, impl AsRef<OsStr>>) -> Result<(), Error> {
// SAFETY: `self.inner` is a valid aws_http_message
let headers = unsafe { aws_http_message_get_headers(self.inner.as_ptr()) };
assert!(!headers.is_null(), "headers are always initialized");
// SAFETY: `aws_http_headers_set` makes a copy of the values in `header`
unsafe {
aws_http_headers_set(
headers,
header.name().as_aws_byte_cursor(),
header.value().as_aws_byte_cursor(),
)
.ok_or_last_error()
}
}

/// Set the request path for this message.
pub fn set_request_path(&mut self, path: impl AsRef<OsStr>) -> Result<(), Error> {
// SAFETY: `aws_http_message_set_request_path` makes a copy of `path`.
Expand Down

1 comment on commit 73a27c1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 73a27c1 Previous: 09f556e Ratio
random_read 1.50224609375 MiB/s 3.3359375 MiB/s 2.22

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.