Skip to content

Commit

Permalink
Validate that shared cache bucket is usable (#1141)
Browse files Browse the repository at this point in the history
## Description of change

- Validates the shared cache bucket is write-able
- Validates the shared cache bucket supports the `EXPRESS_ONEZONE`
storage class

Relevant issues: N/A

## Does this change impact existing behavior?

Yes, the shared cache bucket is now validated that it supports the
`EXPRESS_ONEZONE` storage class

## Does this change need a changelog entry in any of the crates?

No

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Simon Beal <[email protected]>
  • Loading branch information
muddyfish authored Nov 18, 2024
1 parent 1e30bff commit 378a56c
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 15 deletions.
5 changes: 5 additions & 0 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::time::Duration;
use anyhow::{anyhow, Context as _};
use clap::{value_parser, ArgGroup, Parser, ValueEnum};
use fuser::{MountOption, Session};
use futures::executor::block_on;
use futures::task::Spawn;
use mountpoint_s3_client::config::{AddressingStyle, EndpointConfig, S3ClientAuthConfig, S3ClientConfig};
use mountpoint_s3_client::error::ObjectClientError;
Expand Down Expand Up @@ -904,6 +905,8 @@ where
(None, Some((config, bucket_name, cache_bucket_name))) => {
tracing::trace!("using S3 Express One Zone bucket as a cache for object content");
let express_cache = ExpressDataCache::new(client.clone(), config, bucket_name, cache_bucket_name);
block_on(express_cache.verify_cache_valid())
.with_context(|| format!("initial PutObject failed for shared cache bucket {cache_bucket_name}"))?;

let prefetcher = caching_prefetch(express_cache, runtime, prefetcher_config);
let fuse_session = create_filesystem(
Expand Down Expand Up @@ -943,6 +946,8 @@ where
tracing::trace!("using both local disk and S3 Express One Zone bucket as a cache for object content");
let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?;
let express_cache = ExpressDataCache::new(client.clone(), config, bucket_name, cache_bucket_name);
block_on(express_cache.verify_cache_valid())
.with_context(|| format!("initial PutObject failed for shared cache bucket {cache_bucket_name}"))?;
let cache = MultilevelDataCache::new(Arc::new(disk_cache), express_cache, runtime.clone());

let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
Expand Down
86 changes: 81 additions & 5 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use async_trait::async_trait;
use base64ct::{Base64, Encoding};
use bytes::BytesMut;
use futures::{pin_mut, StreamExt};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError, PutObjectError};
use mountpoint_s3_client::types::{
ChecksumMode, GetObjectParams, GetObjectRequest, PutObjectSingleParams, UploadChecksum,
ChecksumMode, GetObjectParams, GetObjectRequest, ObjectClientResult, PutObjectSingleParams, UploadChecksum,
};
use mountpoint_s3_client::ObjectClient;
use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c};
Expand Down Expand Up @@ -72,6 +72,33 @@ where
bucket_name: bucket_name.to_owned(),
}
}

pub async fn verify_cache_valid(&self) -> ObjectClientResult<(), PutObjectError, Client::ClientError> {
let object_key = format!("{}/_mountpoint_cache_metadata", &self.prefix);
// This data is human-readable, and not expected to be read by Mountpoint.
// The file format used here is NOT stable.
// For now, let's just include the data that's guaranteed to be correct as it's what
// calculates the prefix.
let data = format!(
"source_bucket={}\nblock_size={}",
self.bucket_name, self.config.block_size
);

// put_object is sufficient for validating cache, as S3 Directory buckets only support
// read-only, or read-write. Write implies read access.
// Validating we're in a directory bucket by using the `EXPRESS_ONEZONE` storage class.
self.client
.put_object_single(
&self.bucket_name,
&object_key,
&PutObjectSingleParams::new().storage_class("EXPRESS_ONEZONE".to_string()),
data,
)
.in_current_span()
.await?;

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -328,10 +355,10 @@ mod tests {
use crate::sync::Arc;
use proptest::{prop_assert, proptest};

use test_case::test_case;

use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig};
use mountpoint_s3_client::failure_client::{countdown_failure_client, CountdownFailureConfig};
use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError};
use mountpoint_s3_client::types::ETag;
use test_case::test_case;

#[test_case(1024, 512 * 1024; "block_size smaller than part_size")]
#[test_case(8 * 1024 * 1024, 512 * 1024; "block_size larger than part_size")]
Expand Down Expand Up @@ -552,6 +579,55 @@ mod tests {
assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));
}

#[tokio::test]
async fn test_verify_cache_valid_success() {
let source_bucket = "source-bucket";
let bucket = "test-bucket";
let config = MockClientConfig {
bucket: bucket.to_string(),
part_size: 8 * 1024 * 1024,
enable_backpressure: true,
initial_read_window_size: 8 * 1024 * 1024,
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let cache = ExpressDataCache::new(client.clone(), Default::default(), source_bucket, bucket);

cache.verify_cache_valid().await.expect("cache should work");
}

#[tokio::test]
async fn test_verify_cache_valid_failure() {
let source_bucket = "source-bucket";
let bucket = "test-bucket";
let config = MockClientConfig {
bucket: bucket.to_string(),
part_size: 8 * 1024 * 1024,
enable_backpressure: true,
initial_read_window_size: 8 * 1024 * 1024,
..Default::default()
};
let client = Arc::new(MockClient::new(config));

let mut put_single_failures = HashMap::new();
put_single_failures.insert(1, MockClientError("error".to_owned().into()).into());

let failure_client = Arc::new(countdown_failure_client(
client.clone(),
CountdownFailureConfig {
put_single_failures,
..Default::default()
},
));

let cache = ExpressDataCache::new(failure_client, Default::default(), source_bucket, bucket);

cache
.verify_cache_valid()
.await
.expect_err("cache should not report valid if cannot write");
}

proptest! {
#[test]
fn proptest_creates_small_s3_keys(key: String, etag: String, block_idx: BlockIndex, source_description: String, block_size: u64) {
Expand Down
22 changes: 15 additions & 7 deletions mountpoint-s3/tests/common/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ pub mod mock_session {
pub mod s3_session {
use super::*;

use crate::common::s3::{
get_test_bucket_and_prefix, get_test_endpoint_config, get_test_region, get_test_sdk_client,
};
use aws_sdk_s3::operation::head_object::HeadObjectError;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{ChecksumAlgorithm, GlacierJobParameters, RestoreRequest, Tier};
Expand All @@ -319,10 +322,6 @@ pub mod s3_session {
use mountpoint_s3_client::types::{Checksum, PutObjectTrailingChecksums};
use mountpoint_s3_client::S3CrtClient;

use crate::common::s3::{
get_test_bucket_and_prefix, get_test_endpoint_config, get_test_region, get_test_sdk_client,
};

/// Create a FUSE mount backed by a real S3 client
pub fn new(test_name: &str, test_config: TestSessionConfig) -> TestSession {
let mount_dir = tempfile::tempdir().unwrap();
Expand Down Expand Up @@ -363,7 +362,11 @@ pub mod s3_session {
let (bucket, prefix) = get_test_bucket_and_prefix(test_name);
let region = get_test_region();

let client = create_crt_client(test_config.part_size, test_config.initial_read_window_size);
let client = create_crt_client(
test_config.part_size,
test_config.initial_read_window_size,
Default::default(),
);
let runtime = client.event_loop_group();
let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config);
let session = create_fuse_session(
Expand All @@ -380,12 +383,17 @@ pub mod s3_session {
}
}

pub fn create_crt_client(part_size: usize, initial_read_window_size: usize) -> S3CrtClient {
pub fn create_crt_client(
part_size: usize,
initial_read_window_size: usize,
auth_config: S3ClientAuthConfig,
) -> S3CrtClient {
let client_config = S3ClientConfig::default()
.part_size(part_size)
.endpoint_config(get_test_endpoint_config())
.read_backpressure(true)
.initial_read_window(initial_read_window_size);
.initial_read_window(initial_read_window_size)
.auth_config(auth_config);
S3CrtClient::new(client_config).unwrap()
}

Expand Down
75 changes: 72 additions & 3 deletions mountpoint-s3/tests/fuse_tests/cache_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn express_invalid_block_read() {
let prefix = get_test_prefix("express_invalid_block_read");

// Mount the bucket
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default());
let cache = CacheTestWrapper::new(ExpressDataCache::new(
client.clone(),
Default::default(),
Expand Down Expand Up @@ -100,7 +100,7 @@ async fn express_invalid_block_read() {
#[test_case("key", 100, 1024 * 1024; "big file")]
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) {
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default());
let bucket_name = get_standard_bucket();
let express_bucket_name = get_express_bucket();
let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &express_bucket_name);
Expand Down Expand Up @@ -129,7 +129,7 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize)
};
let cache = DiskDataCache::new(cache_dir.path().to_path_buf(), cache_config);

let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default());

let bucket_name = get_test_bucket();
cache_write_read_base(
Expand All @@ -143,6 +143,75 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize)
);
}

#[tokio::test]
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
async fn express_cache_verify_fail_non_express() {
use mountpoint_s3_client::error::ObjectClientError;
use mountpoint_s3_client::S3RequestError::ResponseError;

let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default());
let bucket_name = get_standard_bucket();
let cache_bucket_name = get_standard_bucket();
let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &cache_bucket_name);
let err = cache
.verify_cache_valid()
.await
.expect_err("cannot use standard bucket as shared cache");

if let ObjectClientError::ClientError(ResponseError(request_result)) = err {
let body = request_result.error_response_body.as_ref().expect("should have body");
let body = body.clone().into_string().unwrap();
assert!(body.contains("<Code>InvalidStorageClass</Code>"));
} else {
panic!("wrong error type");
}
}

#[tokio::test]
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
async fn express_cache_verify_fail_forbidden() {
use crate::common::creds::get_scoped_down_credentials;
use mountpoint_s3_client::config::S3ClientAuthConfig;
use mountpoint_s3_client::error::ObjectClientError;
use mountpoint_s3_client::S3RequestError::CrtError;
use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
use mountpoint_s3_crt::common::allocator::Allocator;

let bucket_name = get_standard_bucket();
let cache_bucket_name = get_express_bucket();

// No `s3express:CreateSession` in this policy, so we should get a forbidden error.
let policy = r#"{"Statement": [
{"Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload"], "Resource": "arn:aws:s3:::__BUCKET__/*"},
{"Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::__BUCKET__"}
]}"#;
let policy = policy.replace("__BUCKET__", &cache_bucket_name);
let credentials = get_scoped_down_credentials(policy).await;

// Build a S3CrtClient that uses a static credentials provider with the creds we just got
let config = CredentialsProviderStaticOptions {
access_key_id: credentials.access_key_id(),
secret_access_key: credentials.secret_access_key(),
session_token: credentials.session_token(),
};
let provider = CredentialsProvider::new_static(&Allocator::default(), config).unwrap();

let client = create_crt_client(
CLIENT_PART_SIZE,
CLIENT_PART_SIZE,
S3ClientAuthConfig::Provider(provider),
);

let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &cache_bucket_name);
let err = cache.verify_cache_valid().await.expect_err("cache must be write-able");

if let ObjectClientError::ClientError(CrtError(err)) = err {
assert!(err.to_string().contains("AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED"))
} else {
panic!("wrong error type");
}
}

fn cache_write_read_base<Cache>(
client: S3CrtClient,
bucket: &str,
Expand Down

0 comments on commit 378a56c

Please sign in to comment.