Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: recover file cache index asynchronously #5087

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::Duration;
use std::{env, path};

use common_base::readable_size::ReadableSize;
use common_telemetry::{info, warn};
use common_telemetry::{error, info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
Expand Down Expand Up @@ -153,8 +153,13 @@ async fn build_cache_layer(
.context(error::InitBackendSnafu)?;

let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;
let moved_cache_layer = cache_layer.clone();
tokio::spawn(async move {
if let Err(err) = moved_cache_layer.recover_cache().await {
error!(err; "Failed to recover file cache.")
}
});

info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
Expand Down
13 changes: 9 additions & 4 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use common_telemetry::{debug, error, info};
use futures::AsyncWriteExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
Expand Down Expand Up @@ -67,11 +67,16 @@ impl WriteCache {
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
let file_cache = FileCache::new(local_store, cache_capacity, ttl);
file_cache.recover().await?;
let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
let moved_file_cache = file_cache.clone();
tokio::spawn(async move {
if let Err(err) = moved_file_cache.recover().await {
error!(err; "Failed to recover file cache.")
}
});

Ok(Self {
file_cache: Arc::new(file_cache),
file_cache,
object_store_manager,
puffin_manager_factory,
intermediate_manager,
Expand Down
22 changes: 15 additions & 7 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use opendal::raw::{
};
use opendal::Result;
mod read_cache;
use std::time::Instant;

use common_telemetry::info;
use read_cache::ReadCache;

Expand All @@ -39,17 +41,23 @@ impl<C: Access> Clone for LruCacheLayer<C> {
}

impl<C: Access> LruCacheLayer<C> {
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
/// Create a [`LruCacheLayer`] with local file cache and capacity in bytes.
pub fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
let read_cache = ReadCache::new(file_cache, capacity);
let (entries, bytes) = read_cache.recover_cache().await?;
Ok(Self { read_cache })
}

/// Recovers cache
pub async fn recover_cache(&self) -> Result<()> {
let now = Instant::now();
let (entries, bytes) = self.read_cache.recover_cache().await?;
info!(
"Recovered {} entries and total size {} in bytes for LruCacheLayer",
entries, bytes
"Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
entries,
bytes,
now.elapsed()
);

Ok(Self { read_cache })
Ok(())
}

/// Returns true when the local cache contains the specific file
Expand Down
10 changes: 7 additions & 3 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
.atomic_write_dir(&cache_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());

LruCacheLayer::new(file_cache, 32).await.unwrap()
let cache_layer = LruCacheLayer::new(file_cache, 32).unwrap();
cache_layer.recover_cache().await.unwrap();
cache_layer
};

let store = OperatorBuilder::new(store)
Expand Down Expand Up @@ -308,7 +310,8 @@ async fn test_object_store_cache_policy() -> Result<()> {
let cache_store = file_cache.clone();

// create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap();
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).unwrap();
cache_layer.recover_cache().await.unwrap();
let store = store.layer(cache_layer.clone());

// create several object handler.
Expand Down Expand Up @@ -436,7 +439,8 @@ async fn test_object_store_cache_policy() -> Result<()> {

drop(cache_layer);
// Test recover
let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap();
let cache_layer = LruCacheLayer::new(cache_store, 38).unwrap();
cache_layer.recover_cache().await.unwrap();

// The p2 `NotFound` cache will not be recovered
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
Expand Down
Loading