Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: recover file cache index asynchronously #5087

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,8 @@ async fn build_cache_layer(
.context(error::InitBackendSnafu)?;

let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;

cache_layer.recover_cache(false).await;
info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
path, cache_capacity
Expand Down
29 changes: 22 additions & 7 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::{Duration, Instant};

use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::{info, warn};
use common_telemetry::{error, info, warn};
use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::RemovalCause;
Expand Down Expand Up @@ -188,10 +188,8 @@ impl FileCache {
}
}

/// Recovers the index from local store.
pub(crate) async fn recover(&self) -> Result<()> {
async fn recover_inner(&self) -> Result<()> {
let now = Instant::now();

let mut lister = self
.local_store
.lister_with(FILE_DIR)
Expand Down Expand Up @@ -225,10 +223,23 @@ impl FileCache {
total_size,
now.elapsed()
);

Ok(())
}

/// Recovers the index from local store.
pub(crate) async fn recover(self: &Arc<Self>, sync: bool) {
let moved_self = self.clone();
let handle = tokio::spawn(async move {
if let Err(err) = moved_self.recover_inner().await {
error!(err; "Failed to recover file cache.")
}
});

if sync {
let _ = handle.await;
}
}

/// Returns the cache file path for the key.
pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
cache_file_path(FILE_DIR, key)
Expand Down Expand Up @@ -536,13 +547,17 @@ mod tests {
}

// Recover the cache.
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let cache = Arc::new(FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
));
// No entry before recovery.
assert!(cache
.reader(IndexKey::new(region_id, file_ids[0], file_type))
.await
.is_none());
cache.recover().await.unwrap();
cache.recover(true).await;

// Check size.
cache.memory_index.run_pending_tasks().await;
Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ impl WriteCache {
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
let file_cache = FileCache::new(local_store, cache_capacity, ttl);
file_cache.recover().await?;
let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
file_cache.recover(false).await;

Ok(Self {
file_cache: Arc::new(file_cache),
file_cache,
object_store_manager,
puffin_manager_factory,
intermediate_manager,
Expand Down
1 change: 1 addition & 0 deletions src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ opendal = { version = "0.49", features = [
"services-s3",
] }
prometheus.workspace = true
tokio.workspace = true
uuid.workspace = true

[dev-dependencies]
Expand Down
35 changes: 25 additions & 10 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use opendal::raw::{
};
use opendal::Result;
mod read_cache;
use common_telemetry::info;
use std::time::Instant;

use common_telemetry::{error, info};
use read_cache::ReadCache;

/// An opendal layer with local LRU file cache supporting.
Expand All @@ -39,19 +41,32 @@ impl<C: Access> Clone for LruCacheLayer<C> {
}

impl<C: Access> LruCacheLayer<C> {
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
/// Create a [`LruCacheLayer`] with local file cache and capacity in bytes.
pub fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
let read_cache = ReadCache::new(file_cache, capacity);
let (entries, bytes) = read_cache.recover_cache().await?;

info!(
"Recovered {} entries and total size {} in bytes for LruCacheLayer",
entries, bytes
);

Ok(Self { read_cache })
}

/// Recovers cache
pub async fn recover_cache(&self, sync: bool) {
let now = Instant::now();
let moved_read_cache = self.read_cache.clone();
let handle = tokio::spawn(async move {
match moved_read_cache.recover_cache().await {
Ok((entries, bytes)) => info!(
"Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
entries,
bytes,
now.elapsed()
),
Err(err) => error!(err; "Failed to recover file cache."),
}
});
if sync {
let _ = handle.await;
}
}

/// Returns true when the local cache contains the specific file
pub async fn contains_file(&self, path: &str) -> bool {
self.read_cache.contains_file(path).await
Expand Down
10 changes: 7 additions & 3 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
.atomic_write_dir(&cache_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());

LruCacheLayer::new(file_cache, 32).await.unwrap()
let cache_layer = LruCacheLayer::new(file_cache, 32).unwrap();
cache_layer.recover_cache(true).await;
cache_layer
};

let store = OperatorBuilder::new(store)
Expand Down Expand Up @@ -308,7 +310,8 @@ async fn test_object_store_cache_policy() -> Result<()> {
let cache_store = file_cache.clone();

// create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap();
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).unwrap();
cache_layer.recover_cache(true).await;
let store = store.layer(cache_layer.clone());

// create several object handler.
Expand Down Expand Up @@ -436,7 +439,8 @@ async fn test_object_store_cache_policy() -> Result<()> {

drop(cache_layer);
// Test recover
let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap();
let cache_layer = LruCacheLayer::new(cache_store, 38).unwrap();
cache_layer.recover_cache(true).await;

// The p2 `NotFound` cache will not be recovered
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
Expand Down
Loading