Skip to content

Commit

Permalink
make some s3 error retryable (#5384)
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a authored Sep 4, 2024
1 parent 0820c90 commit 80364dd
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 9 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ zstd = "0.13.0"

aws-config = "1.5.4"
aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] }
aws-runtime = "1.3.1"
aws-sdk-kinesis = "1.37"
aws-sdk-s3 = "1.42"
aws-sdk-sqs = "1.36"
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license.workspace = true

[dependencies]
aws-config = { workspace = true }
aws-runtime = { workspace = true }
aws-sdk-kinesis = { workspace = true, optional = true }
aws-sdk-s3 = { workspace = true }
aws-sdk-sqs = { workspace = true, optional = true }
Expand Down
27 changes: 18 additions & 9 deletions quickwit/quickwit-aws/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#![allow(clippy::match_like_matches_macro)]

use aws_runtime::retries::classifiers::{THROTTLING_ERRORS, TRANSIENT_ERRORS};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError;
use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
Expand Down Expand Up @@ -47,57 +48,65 @@ where E: AwsRetryable
}
}

fn is_retryable(meta: &aws_sdk_s3::error::ErrorMetadata) -> bool {
if let Some(code) = meta.code() {
THROTTLING_ERRORS.contains(&code) || TRANSIENT_ERRORS.contains(&code)
} else {
false
}
}

impl AwsRetryable for GetObjectError {
fn is_retryable(&self) -> bool {
false
is_retryable(self.meta())
}
}

impl AwsRetryable for DeleteObjectError {
fn is_retryable(&self) -> bool {
false
is_retryable(self.meta())
}
}

impl AwsRetryable for DeleteObjectsError {
fn is_retryable(&self) -> bool {
false
is_retryable(self.meta())
}
}

impl AwsRetryable for UploadPartError {
fn is_retryable(&self) -> bool {
false
is_retryable(self.meta())
}
}

impl AwsRetryable for CompleteMultipartUploadError {
fn is_retryable(&self) -> bool {
false
is_retryable(self.meta())
}
}

impl AwsRetryable for AbortMultipartUploadError {
fn is_retryable(&self) -> bool {
false
is_retryable(self.meta())
}
}

impl AwsRetryable for CreateMultipartUploadError {
fn is_retryable(&self) -> bool {
false
is_retryable(self.meta())
}
}

impl AwsRetryable for PutObjectError {
fn is_retryable(&self) -> bool {
false
is_retryable(self.meta())
}
}

impl AwsRetryable for HeadObjectError {
fn is_retryable(&self) -> bool {
false
is_retryable(self.meta())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1169,4 +1169,68 @@ mod tests {
let delete_objects_error = bulk_delete_error.error.unwrap();
assert!(delete_objects_error.to_string().contains("MalformedXML"));
}

#[tokio::test]
async fn test_s3_compatible_storage_retry_put() {
let client = StaticReplayClient::new(vec![
ReplayEvent::new(
// This is quite fragile, currently this is *not* validated by the SDK
// but may in future, that being said, there is no way to know what the
// request should look like until it raises an error in reality as this
// is up to how the validation is implemented.
http::Request::builder().body(SdkBody::empty()).unwrap(),
http::Response::builder()
.status(429)
.body(SdkBody::from_body_0_4(Body::from(Bytes::from(
r#"<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>SlowDown</Code>
<Message>message</Message>
<Resource>/my-path</Resource>
<RequestId>4442587FB7D0A2F9</RequestId>
</Error>"#,
))))
.unwrap(),
),
ReplayEvent::new(
// This is quite fragile, currently this is *not* validated by the SDK
// but may in future, that being said, there is no way to know what the
// request should look like until it raises an error in reality as this
// is up to how the validation is implemented.
http::Request::builder()
.body(SdkBody::from_body_0_4(Body::empty()))
.unwrap(),
http::Response::builder()
.status(200)
.body(SdkBody::from_body_0_4(Body::empty()))
.unwrap(),
),
]);
let credentials = Credentials::new("mock_key", "mock_secret", None, None, "mock_provider");
let config = aws_sdk_s3::Config::builder()
.behavior_version(BehaviorVersion::v2024_03_28())
.region(Some(Region::new("Foo")))
.http_client(client)
.credentials_provider(credentials)
.build();
let s3_client = S3Client::from_conf(config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
disable_multipart_upload: false,
};
s3_storage
.put(Path::new("my-path"), Box::new(vec![1, 2, 3]))
.await
.unwrap();
}
}

0 comments on commit 80364dd

Please sign in to comment.