Skip to content

Commit

Permalink
Add a test for an invalid cache block (#1139)
Browse files Browse the repository at this point in the history
## Description of change

Just adds a test that if a block in the shared cache is invalid, it is
not served to the client application.

Relevant issues: N/A

## Does this change impact existing behavior?

No.

## Does this change need a changelog entry in any of the crates?

No.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Vlad Volodkin <[email protected]>
Co-authored-by: Vlad Volodkin <[email protected]>
  • Loading branch information
vladem and Vlad Volodkin authored Nov 15, 2024
1 parent 625d7db commit 3738860
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 33 deletions.
2 changes: 1 addition & 1 deletion mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use thiserror::Error;
pub use crate::checksums::ChecksummedBytes;
pub use crate::data_cache::cache_directory::ManagedCacheDir;
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::express_data_cache::{ExpressDataCache, ExpressDataCacheConfig};
pub use crate::data_cache::express_data_cache::{build_prefix, get_s3_key, ExpressDataCache, ExpressDataCacheConfig};
pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache;
pub use crate::data_cache::multilevel_cache::MultilevelDataCache;

Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl BlockMetadata {
}

/// Get the prefix for objects we'll be creating in S3
fn build_prefix(source_bucket_name: &str, block_size: u64) -> String {
pub fn build_prefix(source_bucket_name: &str, block_size: u64) -> String {
hex::encode(
Sha256::new()
.chain_update(CACHE_VERSION.as_bytes())
Expand All @@ -311,7 +311,7 @@ fn build_prefix(source_bucket_name: &str, block_size: u64) -> String {
}

/// Get the S3 key this block should be written to or read from.
fn get_s3_key(prefix: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String {
pub fn get_s3_key(prefix: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String {
let hashed_cache_key = hex::encode(
Sha256::new()
.chain_update(cache_key.key())
Expand Down
29 changes: 21 additions & 8 deletions mountpoint-s3/tests/common/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use async_trait::async_trait;
use mountpoint_s3::{
data_cache::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult},
data_cache::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult},
object::ObjectId,
};

Expand All @@ -21,6 +21,8 @@ struct CacheTestWrapperInner<Cache> {
cache: Cache,
/// Number of times the `get_block` succeded and returned data
get_block_hit_count: AtomicU64,
/// Number of times the `get_block` failed because of an invalid block
get_block_invalid_count: AtomicU64,
/// Number of times the `put_block` was completed
put_block_count: AtomicU64,
}
Expand All @@ -39,6 +41,7 @@ impl<Cache> CacheTestWrapper<Cache> {
inner: Arc::new(CacheTestWrapperInner {
cache,
get_block_hit_count: AtomicU64::new(0),
get_block_invalid_count: AtomicU64::new(0),
put_block_count: AtomicU64::new(0),
}),
}
Expand All @@ -62,6 +65,11 @@ impl<Cache> CacheTestWrapper<Cache> {
self.inner.get_block_hit_count.load(Ordering::SeqCst)
}

/// Number of times the `get_block` finished because of an invalid block
pub fn get_block_invalid_count(&self) -> u64 {
self.inner.get_block_invalid_count.load(Ordering::SeqCst)
}

/// Number of times the `put_block` was completed
pub fn put_block_count(&self) -> u64 {
self.inner.put_block_count.load(Ordering::SeqCst)
Expand All @@ -77,18 +85,23 @@ impl<Cache: DataCache + Send + Sync + 'static> DataCache for CacheTestWrapper<Ca
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let result: Option<ChecksummedBytes> = self
let result = self
.inner
.cache
.get_block(cache_key, block_idx, block_offset, object_size)
.await?;
.await;

// The cache hit happens only if the `get_block` was successful and returned data
if result.is_some() {
self.inner.get_block_hit_count.fetch_add(1, Ordering::SeqCst);
}
match result.as_ref() {
Ok(Some(_)) => {
self.inner.get_block_hit_count.fetch_add(1, Ordering::SeqCst);
}
Err(DataCacheError::InvalidBlockHeader(_)) => {
self.inner.get_block_invalid_count.fetch_add(1, Ordering::SeqCst);
}
_ => (),
};

Ok(result)
result
}

async fn put_block(
Expand Down
13 changes: 8 additions & 5 deletions mountpoint-s3/tests/common/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ use rand_chacha::rand_core::OsRng;
use crate::common::tokio_block_on;

pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) {
#[cfg(not(feature = "s3express_tests"))]
let bucket = get_standard_bucket();
#[cfg(feature = "s3express_tests")]
let bucket = get_express_bucket();

let bucket = get_test_bucket();
let prefix = get_test_prefix(test_name);

(bucket, prefix)
Expand All @@ -29,6 +25,13 @@ pub fn get_test_prefix(test_name: &str) -> String {
format!("{prefix}{test_name}/{nonce}/")
}

pub fn get_test_bucket() -> String {
#[cfg(not(feature = "s3express_tests"))]
return get_standard_bucket();
#[cfg(feature = "s3express_tests")]
return get_express_bucket();
}

#[cfg(feature = "s3express_tests")]
pub fn get_express_bucket() -> String {
std::env::var("S3_EXPRESS_ONE_ZONE_BUCKET_NAME")
Expand Down
138 changes: 122 additions & 16 deletions mountpoint-s3/tests/fuse_tests/cache_test.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,104 @@
use crate::common::cache::CacheTestWrapper;
use crate::common::fuse::create_fuse_session;
use crate::common::fuse::s3_session::create_crt_client;
use crate::common::s3::{get_express_bucket, get_standard_bucket, get_test_bucket_and_prefix};
use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig, ExpressDataCache};
use crate::common::s3::{get_test_bucket, get_test_prefix};

use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig};
use mountpoint_s3::prefetch::caching_prefetch;
use mountpoint_s3_client::S3CrtClient;

use fuser::BackgroundSession;
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use std::fs;
use std::time::Duration;
use tempfile::TempDir;
use test_case::test_case;

#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use crate::common::s3::{get_express_bucket, get_standard_bucket};
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use mountpoint_s3::data_cache::{build_prefix, get_s3_key, BlockIndex, ExpressDataCache};
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use mountpoint_s3::object::ObjectId;
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use mountpoint_s3_client::types::{PutObjectSingleParams, UploadChecksum};
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use mountpoint_s3_client::ObjectClient;
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use mountpoint_s3_crt::checksums::crc32c;

const CACHE_BLOCK_SIZE: u64 = 1024 * 1024;
const CLIENT_PART_SIZE: usize = 8 * 1024 * 1024;

/// A test that checks that an invalid block may not be served from the cache
#[tokio::test]
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
async fn express_invalid_block_read() {
let bucket = get_standard_bucket();
let cache_bucket = get_express_bucket();
let prefix = get_test_prefix("express_invalid_block_read");

// Mount the bucket
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let cache = CacheTestWrapper::new(ExpressDataCache::new(
client.clone(),
Default::default(),
&bucket,
&cache_bucket,
));
let (mount_point, _session) = mount_bucket(client.clone(), cache.clone(), &bucket, &prefix);

// Put an object to the mounted bucket
let object_key = generate_unprefixed_key(&prefix, "key", 100);
let full_object_key = format!("{prefix}{object_key}");
let object_data = "object_data";
let result = client
.put_object_single(&bucket, &full_object_key, &Default::default(), object_data)
.await
.expect("put object must succeed");
let object_etag = result.etag.into_inner();

// Read data twice, expect cache hits and no errors
let path = mount_point.path().join(&object_key);

let put_block_count = cache.put_block_count();
let read = fs::read(&path).expect("read should succeed");
assert_eq!(read, object_data.as_bytes());
cache.wait_for_put(Duration::from_secs(10), put_block_count);

let read = fs::read(&path).expect("read should succeed");
assert_eq!(read, object_data.as_bytes());

assert_eq!(cache.get_block_invalid_count(), 0, "no invalid blocks yet");
assert!(cache.get_block_hit_count() > 0, "reads should result in a cache hit");

// Corrupt the cache block by replacing it with an object holding no metadata
let object_id = get_object_id(&prefix, &object_key, &object_etag);
let block_key = get_express_cache_block_key(&bucket, &object_id, 0);
let corrupted_block = "corrupted_block";
let checksum = crc32c::checksum(corrupted_block.as_bytes());
let put_object_params = PutObjectSingleParams::default().checksum(Some(UploadChecksum::Crc32c(checksum)));
client
.put_object_single(&cache_bucket, &block_key, &put_object_params, corrupted_block)
.await
.expect("put object must succeed");

// Expect a successfull read from the source bucket. We expect cache errors being recorded because of the corrupted block.
let path = mount_point.path().join(&object_key);
let read = fs::read(&path).expect("read should succeed");
assert_eq!(read, object_data.as_bytes());
assert!(
cache.get_block_invalid_count() > 0,
"read should result in cache errors"
);
}

#[test_case("key", 100, 1024; "simple")]
#[test_case("£", 100, 1024; "non-ascii key")]
#[test_case("key", 1024, 1024; "long key")]
#[test_case("key", 100, 1024 * 1024; "big file")]
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) {
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let bucket_name = get_standard_bucket();
Expand All @@ -26,6 +107,7 @@ fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usiz

cache_write_read_base(
client,
&bucket_name,
key_suffix,
key_size,
object_size,
Expand All @@ -38,6 +120,7 @@ fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usiz
#[test_case("£", 100, 1024; "non-ascii key")]
#[test_case("key", 1024, 1024; "long key")]
#[test_case("key", 100, 1024 * 1024; "big file")]
#[cfg(feature = "s3_tests")]
fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) {
let cache_dir = tempfile::tempdir().unwrap();
let cache_config = DiskDataCacheConfig {
Expand All @@ -48,8 +131,10 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize)

let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);

let bucket_name = get_test_bucket();
cache_write_read_base(
client,
&bucket_name,
key_suffix,
key_size,
object_size,
Expand All @@ -60,6 +145,7 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize)

fn cache_write_read_base<Cache>(
client: S3CrtClient,
bucket: &str,
key_suffix: &str,
key_size: usize,
object_size: usize,
Expand All @@ -68,24 +154,14 @@ fn cache_write_read_base<Cache>(
) where
Cache: DataCache + Send + Sync + 'static,
{
let (bucket, prefix) = get_test_bucket_and_prefix(test_name);
let prefix = get_test_prefix(test_name);

// Mount a bucket
let mount_point = tempfile::tempdir().unwrap();
let runtime = client.event_loop_group();
let cache = CacheTestWrapper::new(cache);
let prefetcher = caching_prefetch(cache.clone(), runtime, Default::default());
let _session = create_fuse_session(
client,
prefetcher,
&bucket,
&prefix,
mount_point.path(),
Default::default(),
);
let (mount_point, _session) = mount_bucket(client, cache.clone(), bucket, &prefix);

// Write an object, no caching happens yet
let key = get_object_key(&prefix, key_suffix, key_size);
let key = generate_unprefixed_key(&prefix, key_suffix, key_size);
let path = mount_point.path().join(&key);
let written = random_binary_data(object_size);
fs::write(&path, &written).expect("write should succeed");
Expand Down Expand Up @@ -119,7 +195,8 @@ fn random_binary_data(size_in_bytes: usize) -> Vec<u8> {
}

/// Creates a random key which has a size of at least `min_size_in_bytes`
fn get_object_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize) -> String {
/// The `key_prefix` is not included in the return value.
fn generate_unprefixed_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize) -> String {
let random_suffix: u64 = rand::thread_rng().gen();
let last_key_part = format!("{key_suffix}{random_suffix}"); // part of the key after all the "/"
let full_key = format!("{key_prefix}{last_key_part}");
Expand All @@ -128,3 +205,32 @@ fn get_object_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize)
let padding = "0".repeat(padding_size);
format!("{last_key_part}{padding}")
}

fn mount_bucket<Cache>(client: S3CrtClient, cache: Cache, bucket: &str, prefix: &str) -> (TempDir, BackgroundSession)
where
Cache: DataCache + Send + Sync + 'static,
{
let mount_point = tempfile::tempdir().unwrap();
let runtime = client.event_loop_group();
let prefetcher = caching_prefetch(cache, runtime, Default::default());
let session = create_fuse_session(
client,
prefetcher,
bucket,
prefix,
mount_point.path(),
Default::default(),
);
(mount_point, session)
}

#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
fn get_object_id(prefix: &str, key: &str, etag: &str) -> ObjectId {
ObjectId::new(format!("{prefix}{key}"), etag.into())
}

#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
fn get_express_cache_block_key(bucket: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String {
let block_key_prefix = build_prefix(bucket, CACHE_BLOCK_SIZE);
get_s3_key(&block_key_prefix, cache_key, block_idx)
}
2 changes: 1 addition & 1 deletion mountpoint-s3/tests/fuse_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
#[cfg(feature = "s3_tests")]
mod cache_test;
mod consistency_test;
mod fork_test;
Expand Down

0 comments on commit 3738860

Please sign in to comment.