Skip to content

Commit

Permalink
Add an integration test for the shared cache (#1071)
Browse files Browse the repository at this point in the history
## Description of change

Add an integration test for the shared cache. It uses
`S3_EXPRESS_ONE_ZONE_BUCKET_NAME` as a cache bucket and `S3_BUCKET_NAME`
as a regular bucket.

Relevant issues: No

## Does this change impact existing behavior?

No.

## Does this change need a changelog entry in any of the crates?

No.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Vlad Volodkin <[email protected]>
Co-authored-by: Vlad Volodkin <[email protected]>
  • Loading branch information
vladem and Vlad Volodkin authored Nov 14, 2024
1 parent f14667f commit 4af1944
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 15 deletions.
114 changes: 114 additions & 0 deletions mountpoint-s3/tests/common/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use async_trait::async_trait;
use mountpoint_s3::{
data_cache::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult},
object::ObjectId,
};

/// A wrapper around any type implementing [DataCache], which counts operations
pub struct CacheTestWrapper<Cache> {
inner: Arc<CacheTestWrapperInner<Cache>>,
}

struct CacheTestWrapperInner<Cache> {
cache: Cache,
/// Number of times the `get_block` succeded and returned data
get_block_hit_count: AtomicU64,
/// Number of times the `put_block` was completed
put_block_count: AtomicU64,
}

impl<Cache> Clone for CacheTestWrapper<Cache> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}

impl<Cache> CacheTestWrapper<Cache> {
pub fn new(cache: Cache) -> Self {
CacheTestWrapper {
inner: Arc::new(CacheTestWrapperInner {
cache,
get_block_hit_count: AtomicU64::new(0),
put_block_count: AtomicU64::new(0),
}),
}
}

pub fn wait_for_put(&self, max_wait_duration: Duration, previous_value: u64) {
let st = std::time::Instant::now();
loop {
if st.elapsed() > max_wait_duration {
panic!("timeout on waiting for a write to the cache to happen")
}
if self.put_block_count() > previous_value {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
}

/// Number of times the `get_block` succeded and returned data
pub fn get_block_hit_count(&self) -> u64 {
self.inner.get_block_hit_count.load(Ordering::SeqCst)
}

/// Number of times the `put_block` was completed
pub fn put_block_count(&self) -> u64 {
self.inner.put_block_count.load(Ordering::SeqCst)
}
}

#[async_trait]
impl<Cache: DataCache + Send + Sync + 'static> DataCache for CacheTestWrapper<Cache> {
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let result: Option<ChecksummedBytes> = self
.inner
.cache
.get_block(cache_key, block_idx, block_offset, object_size)
.await?;

// The cache hit happens only if the `get_block` was successful and returned data
if result.is_some() {
self.inner.get_block_hit_count.fetch_add(1, Ordering::SeqCst);
}

Ok(result)
}

async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()> {
let result = self
.inner
.cache
.put_block(cache_key, block_idx, block_offset, bytes, object_size)
.await;
self.inner.put_block_count.fetch_add(1, Ordering::SeqCst);
result
}

fn block_size(&self) -> u64 {
self.inner.cache.block_size()
}
}
18 changes: 11 additions & 7 deletions mountpoint-s3/tests/common/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub trait TestSessionCreator: FnOnce(&str, TestSessionConfig) -> TestSession {}
// `FnOnce(...)` in place of `impl TestSessionCreator`.
impl<T> TestSessionCreator for T where T: FnOnce(&str, TestSessionConfig) -> TestSession {}

fn create_fuse_session<Client, Prefetcher>(
pub fn create_fuse_session<Client, Prefetcher>(
client: Client,
prefetcher: Prefetcher,
bucket: &str,
Expand Down Expand Up @@ -363,12 +363,7 @@ pub mod s3_session {
let (bucket, prefix) = get_test_bucket_and_prefix(test_name);
let region = get_test_region();

let client_config = S3ClientConfig::default()
.part_size(test_config.part_size)
.endpoint_config(get_test_endpoint_config())
.read_backpressure(true)
.initial_read_window(test_config.initial_read_window_size);
let client = S3CrtClient::new(client_config).unwrap();
let client = create_crt_client(test_config.part_size, test_config.initial_read_window_size);
let runtime = client.event_loop_group();
let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config);
let session = create_fuse_session(
Expand All @@ -385,6 +380,15 @@ pub mod s3_session {
}
}

pub fn create_crt_client(part_size: usize, initial_read_window_size: usize) -> S3CrtClient {
let client_config = S3ClientConfig::default()
.part_size(part_size)
.endpoint_config(get_test_endpoint_config())
.read_backpressure(true)
.initial_read_window(initial_read_window_size);
S3CrtClient::new(client_config).unwrap()
}

fn create_test_client(region: &str, bucket: &str, prefix: &str) -> impl TestClient {
let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await });
SDKTestClient {
Expand Down
2 changes: 2 additions & 0 deletions mountpoint-s3/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//! Allow for unused items since this is included independently in each module.
#![allow(dead_code)]

pub mod cache;

pub mod creds;

#[cfg(feature = "fuse_tests")]
Expand Down
28 changes: 20 additions & 8 deletions mountpoint-s3/tests/common/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,35 @@ use rand_chacha::rand_core::OsRng;
use crate::common::tokio_block_on;

pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) {
let bucket = if cfg!(feature = "s3express_tests") {
std::env::var("S3_EXPRESS_ONE_ZONE_BUCKET_NAME")
.expect("Set S3_EXPRESS_ONE_ZONE_BUCKET_NAME to run integration tests")
} else {
std::env::var("S3_BUCKET_NAME").expect("Set S3_BUCKET_NAME to run integration tests")
};
#[cfg(not(feature = "s3express_tests"))]
let bucket = get_standard_bucket();
#[cfg(feature = "s3express_tests")]
let bucket = get_express_bucket();

let prefix = get_test_prefix(test_name);

(bucket, prefix)
}

pub fn get_test_prefix(test_name: &str) -> String {
// Generate a random nonce to make sure this prefix is truly unique
let nonce = OsRng.next_u64();

// Prefix always has a trailing "/" to keep meaning in sync with the S3 API.
let prefix = std::env::var("S3_BUCKET_TEST_PREFIX").unwrap_or(String::from("mountpoint-test/"));
assert!(prefix.ends_with('/'), "S3_BUCKET_TEST_PREFIX should end in '/'");

let prefix = format!("{prefix}{test_name}/{nonce}/");
format!("{prefix}{test_name}/{nonce}/")
}

(bucket, prefix)
#[cfg(feature = "s3express_tests")]
pub fn get_express_bucket() -> String {
std::env::var("S3_EXPRESS_ONE_ZONE_BUCKET_NAME")
.expect("Set S3_EXPRESS_ONE_ZONE_BUCKET_NAME to run integration tests")
}

pub fn get_standard_bucket() -> String {
std::env::var("S3_BUCKET_NAME").expect("Set S3_BUCKET_NAME to run integration tests")
}

pub fn get_test_bucket_forbidden() -> String {
Expand Down
130 changes: 130 additions & 0 deletions mountpoint-s3/tests/fuse_tests/cache_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use crate::common::cache::CacheTestWrapper;
use crate::common::fuse::create_fuse_session;
use crate::common::fuse::s3_session::create_crt_client;
use crate::common::s3::{get_express_bucket, get_standard_bucket, get_test_bucket_and_prefix};
use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig, ExpressDataCache};
use mountpoint_s3::prefetch::caching_prefetch;
use mountpoint_s3_client::S3CrtClient;
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use std::fs;
use std::time::Duration;
use test_case::test_case;

const CACHE_BLOCK_SIZE: u64 = 1024 * 1024;
const CLIENT_PART_SIZE: usize = 8 * 1024 * 1024;

#[test_case("key", 100, 1024; "simple")]
#[test_case("£", 100, 1024; "non-ascii key")]
#[test_case("key", 1024, 1024; "long key")]
#[test_case("key", 100, 1024 * 1024; "big file")]
fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) {
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let bucket_name = get_standard_bucket();
let express_bucket_name = get_express_bucket();
let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &express_bucket_name);

cache_write_read_base(
client,
key_suffix,
key_size,
object_size,
cache,
"express_cache_write_read",
)
}

#[test_case("key", 100, 1024; "simple")]
#[test_case("£", 100, 1024; "non-ascii key")]
#[test_case("key", 1024, 1024; "long key")]
#[test_case("key", 100, 1024 * 1024; "big file")]
fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) {
let cache_dir = tempfile::tempdir().unwrap();
let cache_config = DiskDataCacheConfig {
block_size: CACHE_BLOCK_SIZE,
limit: Default::default(),
};
let cache = DiskDataCache::new(cache_dir.path().to_path_buf(), cache_config);

let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);

cache_write_read_base(
client,
key_suffix,
key_size,
object_size,
cache,
"disk_cache_write_read",
);
}

fn cache_write_read_base<Cache>(
client: S3CrtClient,
key_suffix: &str,
key_size: usize,
object_size: usize,
cache: Cache,
test_name: &str,
) where
Cache: DataCache + Send + Sync + 'static,
{
let (bucket, prefix) = get_test_bucket_and_prefix(test_name);

// Mount a bucket
let mount_point = tempfile::tempdir().unwrap();
let runtime = client.event_loop_group();
let cache = CacheTestWrapper::new(cache);
let prefetcher = caching_prefetch(cache.clone(), runtime, Default::default());
let _session = create_fuse_session(
client,
prefetcher,
&bucket,
&prefix,
mount_point.path(),
Default::default(),
);

// Write an object, no caching happens yet
let key = get_object_key(&prefix, key_suffix, key_size);
let path = mount_point.path().join(&key);
let written = random_binary_data(object_size);
fs::write(&path, &written).expect("write should succeed");
let put_block_count = cache.put_block_count();
assert_eq!(put_block_count, 0, "no cache writes should happen yet");

// First read should be from the source bucket and be cached
let read = fs::read(&path).expect("read should succeed");
assert_eq!(read, written);

// Cache population is async, wait for it to happen
cache.wait_for_put(Duration::from_secs(10), put_block_count);

// Second read should be from the cache
let cache_hits_before_read = cache.get_block_hit_count();
let read = fs::read(&path).expect("read from the cache should succeed");
assert_eq!(read, written);
assert!(
cache.get_block_hit_count() > cache_hits_before_read,
"read should result in a cache hit"
);
}

/// Generates random data of the specified size
fn random_binary_data(size_in_bytes: usize) -> Vec<u8> {
let seed = rand::thread_rng().gen();
let mut rng = ChaChaRng::seed_from_u64(seed);
let mut data = vec![0; size_in_bytes];
rng.fill_bytes(&mut data);
data
}

/// Creates a random key which has a size of at least `min_size_in_bytes`
fn get_object_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize) -> String {
let random_suffix: u64 = rand::thread_rng().gen();
let last_key_part = format!("{key_suffix}{random_suffix}"); // part of the key after all the "/"
let full_key = format!("{key_prefix}{last_key_part}");
let full_key_size = full_key.as_bytes().len();
let padding_size = min_size_in_bytes.saturating_sub(full_key_size);
let padding = "0".repeat(padding_size);
format!("{last_key_part}{padding}")
}
2 changes: 2 additions & 0 deletions mountpoint-s3/tests/fuse_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
mod cache_test;
mod consistency_test;
mod fork_test;
mod lookup_test;
Expand Down

0 comments on commit 4af1944

Please sign in to comment.