Skip to content

Commit

Permalink
Bypass the shared cache for large objects (#1117)
Browse files Browse the repository at this point in the history
## Description of change

This change makes `get_block` and `put_block` for objects larger than
`1MiB` be a no-op in the shared cache.

Relevant issues: N/A

## Does this change impact existing behavior?

No, it is under the feature flag.

## Does this change need a changelog entry in any of the crates?

Yes, in the following PRs.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Vlad Volodkin <[email protected]>
Co-authored-by: Vlad Volodkin <[email protected]>
  • Loading branch information
vladem and Vlad Volodkin authored Nov 13, 2024
1 parent 1c6f819 commit 9206ed4
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 91 deletions.
39 changes: 20 additions & 19 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use regex::Regex;
use sysinfo::{RefreshKind, System};

use crate::data_cache::{
CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir, MultilevelDataCache,
CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ExpressDataCacheConfig, ManagedCacheDir,
MultilevelDataCache,
};
use crate::fs::{CacheConfig, ServerSideEncryption, TimeToLive};
use crate::fuse::session::FuseSession;
Expand Down Expand Up @@ -445,7 +446,17 @@ impl CliArgs {
None
}

fn disk_data_cache_config(&self) -> Option<(&Path, DiskDataCacheConfig)> {
fn express_data_cache_config(&self) -> Option<(ExpressDataCacheConfig, &str, &str)> {
let express_bucket_name = self.cache_express_bucket_name()?;
let config = ExpressDataCacheConfig {
block_size: self.cache_block_size_in_bytes(),
..Default::default()
};

Some((config, &self.bucket_name, express_bucket_name))
}

fn disk_data_cache_config(&self) -> Option<(DiskDataCacheConfig, &Path)> {
match self.cache.as_ref() {
Some(path) => {
let cache_limit = match self.max_cache_size {
Expand All @@ -460,7 +471,7 @@ impl CliArgs {
block_size: self.cache_block_size_in_bytes(),
limit: cache_limit,
};
Some((path.as_path(), cache_config))
Some((cache_config, path.as_path()))
}
None => None,
}
Expand Down Expand Up @@ -880,15 +891,10 @@ where
tracing::trace!("using metadata TTL setting {metadata_cache_ttl:?}");
filesystem_config.cache_config = CacheConfig::new(metadata_cache_ttl);

match (args.disk_data_cache_config(), args.cache_express_bucket_name()) {
(None, Some(express_bucket_name)) => {
match (args.disk_data_cache_config(), args.express_data_cache_config()) {
(None, Some((config, bucket_name, cache_bucket_name))) => {
tracing::trace!("using S3 Express One Zone bucket as a cache for object content");
let express_cache = ExpressDataCache::new(
express_bucket_name,
client.clone(),
&args.bucket_name,
args.cache_block_size_in_bytes(),
);
let express_cache = ExpressDataCache::new(client.clone(), config, bucket_name, cache_bucket_name);

let prefetcher = caching_prefetch(express_cache, runtime, prefetcher_config);
let fuse_session = create_filesystem(
Expand All @@ -903,7 +909,7 @@ where

Ok(fuse_session)
}
(Some((cache_dir_path, disk_data_cache_config)), None) => {
(Some((disk_data_cache_config, cache_dir_path)), None) => {
tracing::trace!("using local disk as a cache for object content");
let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?;

Expand All @@ -924,15 +930,10 @@ where

Ok(fuse_session)
}
(Some((cache_dir_path, disk_data_cache_config)), Some(express_bucket_name)) => {
(Some((disk_data_cache_config, cache_dir_path)), Some((config, bucket_name, cache_bucket_name))) => {
tracing::trace!("using both local disk and S3 Express One Zone bucket as a cache for object content");
let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?;
let express_cache = ExpressDataCache::new(
express_bucket_name,
client.clone(),
&args.bucket_name,
args.cache_block_size_in_bytes(),
);
let express_cache = ExpressDataCache::new(client.clone(), config, bucket_name, cache_bucket_name);
let cache = MultilevelDataCache::new(Arc::new(disk_cache), express_cache, runtime.clone());

let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
Expand Down
4 changes: 3 additions & 1 deletion mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use thiserror::Error;
pub use crate::checksums::ChecksummedBytes;
pub use crate::data_cache::cache_directory::ManagedCacheDir;
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::express_data_cache::ExpressDataCache;
pub use crate::data_cache::express_data_cache::{ExpressDataCache, ExpressDataCacheConfig};
pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache;
pub use crate::data_cache::multilevel_cache::MultilevelDataCache;

Expand Down Expand Up @@ -54,6 +54,7 @@ pub trait DataCache {
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>>;

/// Put block of data to the cache for the given [ObjectId] and [BlockIndex].
Expand All @@ -63,6 +64,7 @@ pub trait DataCache {
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()>;

/// Returns the block size for the data cache.
Expand Down
38 changes: 25 additions & 13 deletions mountpoint-s3/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ impl DataCache for DiskDataCache {
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
_object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
if block_offset != block_idx * self.config.block_size {
return Err(DataCacheError::InvalidBlockOffset);
Expand Down Expand Up @@ -418,6 +419,7 @@ impl DataCache for DiskDataCache {
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
_object_size: usize,
) -> DataCacheResult<()> {
if block_offset != block_idx * self.config.block_size {
return Err(DataCacheError::InvalidBlockOffset);
Expand Down Expand Up @@ -645,6 +647,9 @@ mod tests {
let data_2 = ChecksummedBytes::new("Bar".into());
let data_3 = ChecksummedBytes::new("Baz".into());

let object_1_size = data_1.len() + data_3.len();
let object_2_size = data_2.len();

let block_size = 8 * 1024 * 1024;
let cache_directory = tempfile::tempdir().unwrap();
let cache = DiskDataCache::new(
Expand All @@ -661,7 +666,7 @@ mod tests {
);

let block = cache
.get_block(&cache_key_1, 0, 0)
.get_block(&cache_key_1, 0, 0, object_1_size)
.await
.expect("cache should be accessible");
assert!(
Expand All @@ -672,11 +677,11 @@ mod tests {

// PUT and GET, OK?
cache
.put_block(cache_key_1.clone(), 0, 0, data_1.clone())
.put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size)
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 0, 0)
.get_block(&cache_key_1, 0, 0, object_1_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
Expand All @@ -687,11 +692,11 @@ mod tests {

// PUT AND GET a second file, OK?
cache
.put_block(cache_key_2.clone(), 0, 0, data_2.clone())
.put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size)
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_2, 0, 0)
.get_block(&cache_key_2, 0, 0, object_2_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
Expand All @@ -702,11 +707,11 @@ mod tests {

// PUT AND GET a second block in a cache entry, OK?
cache
.put_block(cache_key_1.clone(), 1, block_size, data_3.clone())
.put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size)
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 1, block_size)
.get_block(&cache_key_1, 1, block_size, object_1_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
Expand All @@ -717,7 +722,7 @@ mod tests {

// Entry 1's first block still intact
let entry = cache
.get_block(&cache_key_1, 0, 0)
.get_block(&cache_key_1, 0, 0, object_1_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
Expand All @@ -743,11 +748,11 @@ mod tests {
let cache_key = ObjectId::new("a".into(), ETag::for_tests());

cache
.put_block(cache_key.clone(), 0, 0, slice.clone())
.put_block(cache_key.clone(), 0, 0, slice.clone(), slice.len())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key, 0, 0)
.get_block(&cache_key, 0, 0, slice.len())
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
Expand Down Expand Up @@ -778,9 +783,10 @@ mod tests {
cache_key: &ObjectId,
block_idx: u64,
expected_bytes: &ChecksummedBytes,
object_size: usize,
) -> bool {
if let Some(retrieved) = cache
.get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64)
.get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64, object_size)
.await
.expect("cache should be accessible")
{
Expand Down Expand Up @@ -828,6 +834,7 @@ mod tests {
block_idx as u64,
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
LARGE_OBJECT_SIZE,
)
.await
.unwrap();
Expand All @@ -841,13 +848,16 @@ mod tests {
block_idx as u64,
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
SMALL_OBJECT_SIZE,
)
.await
.unwrap();
}

let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate())
.filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes))
.filter(|&(block_idx, bytes)| {
is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes, SMALL_OBJECT_SIZE)
})
.count()
.await;
assert_eq!(
Expand All @@ -857,7 +867,9 @@ mod tests {
);

let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate())
.filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes))
.filter(|&(block_idx, bytes)| {
is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes, LARGE_OBJECT_SIZE)
})
.count()
.await;
assert!(
Expand Down
Loading

0 comments on commit 9206ed4

Please sign in to comment.