diff --git a/src/bin/nydusd/blob_cache.rs b/src/bin/nydusd/blob_cache.rs index e66c3231abd..efa229dd735 100644 --- a/src/bin/nydusd/blob_cache.rs +++ b/src/bin/nydusd/blob_cache.rs @@ -205,6 +205,22 @@ impl BlobCacheState { fn get(&self, key: &str) -> Option { self.id_to_config_map.get(key).cloned() } + + /// get DataBlob number for a domain_id + fn get_blobs_num(&self, domain_id: &str) -> usize { + let scoped_blob_prefix = format!("{}{}", domain_id, ID_SPLITTER); + return self + .id_to_config_map + .values() + .filter(|v| { + if let BlobCacheObjectConfig::DataBlob(o) = v { + o.scoped_blob_id.starts_with(&scoped_blob_prefix) + } else { + false + } + }) + .count(); + } } /// Manager for cached file objects. @@ -266,6 +282,10 @@ impl BlobCacheMgr { self.get_state().get(key) } + pub fn get_blobs_num(&self, domain_id: &str) -> usize { + self.get_state().get_blobs_num(domain_id) + } + #[inline] fn get_state(&self) -> MutexGuard { self.state.lock().unwrap() diff --git a/src/bin/nydusd/fs_cache.rs b/src/bin/nydusd/fs_cache.rs index 9d97036ba53..d5daf2e71a4 100644 --- a/src/bin/nydusd/fs_cache.rs +++ b/src/bin/nydusd/fs_cache.rs @@ -16,12 +16,13 @@ use std::ptr::read_unaligned; use std::string::String; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Barrier, Mutex, MutexGuard}; +use std::{thread, time}; use mio::unix::SourceFd; use mio::{Events, Interest, Poll, Token, Waker}; use storage::cache::BlobCache; use storage::device::BlobPrefetchRequest; -use storage::factory::BLOB_FACTORY; +use storage::factory::{ASYNC_RUNTIME, BLOB_FACTORY}; use crate::blob_cache::{ generate_blob_key, BlobCacheConfigBootstrap, BlobCacheConfigDataBlob, BlobCacheMgr, @@ -407,7 +408,7 @@ impl FsCacheHandler { } Some(cfg) => match cfg { BlobCacheObjectConfig::DataBlob(config) => { - self.handle_open_data_blob(hdr, msg, config) + self.handle_open_data_blob(hdr, msg, config, domain_id) } BlobCacheObjectConfig::Bootstrap(config) => { self.handle_open_bootstrap(hdr, msg, config) @@ -422,10 +423,12 @@ impl FsCacheHandler { hdr: &FsCacheMsgHeader, msg: &FsCacheMsgOpen, config: Arc, + domain_id: &str, ) -> String { let mut state = self.state.lock().unwrap(); + let blobs_need = state.blob_cache_mgr.get_blobs_num(domain_id); if let Vacant(e) = state.id_to_object_map.entry(hdr.object_id) { - match self.create_data_blob_object(&config, msg.fd) { + match self.create_data_blob_object(&config, msg.fd, blobs_need) { Err(s) => format!("copen {},{}", hdr.msg_id, s), Ok((blob, blob_size)) => { e.insert((FsCacheObject::DataBlob(blob.clone()), msg.fd)); @@ -491,6 +494,7 @@ impl FsCacheHandler { &self, config: &BlobCacheConfigDataBlob, fd: u32, + blobs_need: usize, ) -> std::result::Result<(Arc, u64), i32> { let mut blob_info = config.blob_info().deref().clone(); // `BlobInfo` from the configuration cache should not have fscache file associated with it. @@ -500,8 +504,7 @@ impl FsCacheHandler { let file = unsafe { File::from_raw_fd(fd as RawFd) }; blob_info.set_fscache_file(Some(Arc::new(file))); let blob_ref = Arc::new(blob_info); - - match BLOB_FACTORY.new_blob_cache(config.factory_config(), &blob_ref) { + match BLOB_FACTORY.new_blob_cache(config.factory_config(), &blob_ref, blobs_need) { Err(_e) => Err(-libc::ENOENT), Ok(blob) => match blob.blob_uncompressed_size() { Err(_e) => Err(-libc::EIO), @@ -510,6 +513,38 @@ impl FsCacheHandler { } } + fn fill_bootstrap_cache(bootstrap_fd: RawFd, cachefile_fd: RawFd, size: usize) -> Result<()> { + let base = unsafe { + libc::mmap( + std::ptr::null_mut(), + size, + libc::PROT_READ, + libc::MAP_SHARED, + bootstrap_fd, + 0_i64, + ) + }; + if base == libc::MAP_FAILED { + warn!( + "fscache: failed to mmap bootstrap file, {}", + std::io::Error::last_os_error() + ); + return Err(eio!("fscache: fill bootstrap cachefile error")); + } + + let ret = unsafe { libc::pwrite(cachefile_fd, base, size, 0) }; + let _ = unsafe { libc::munmap(base, size) }; + + if ret != size as isize { + warn!( + "fscache: failed to write bootstrap blob data to cached file, {}", + std::io::Error::last_os_error() + ); + return Err(eio!("fscache: fill bootstrap cachefile error")); + } + Ok(()) + } + fn handle_open_bootstrap( &self, hdr: &FsCacheMsgHeader, @@ -538,11 +573,26 @@ impl FsCacheHandler { } Ok(md) => { let cache_file = unsafe { File::from_raw_fd(msg.fd as RawFd) }; + let bootstrap_fd = f.as_raw_fd(); + let cachefile_fd = cache_file.as_raw_fd(); let object = FsCacheObject::Bootstrap(Arc::new(FsCacheBootStrap { bootstrap_file: f, cache_file, })); e.insert((object, msg.fd)); + let len = md.len() as usize; + ASYNC_RUNTIME.spawn_blocking(move || { + //add slight delay to let copen reply first + thread::sleep(time::Duration::from_millis(10)); + for _i in 0..3 { + if Self::fill_bootstrap_cache(bootstrap_fd, cachefile_fd, len) + .is_ok() + { + break; + } + thread::sleep(time::Duration::from_secs(2)); + } + }); md.len() as i64 } }, diff --git a/storage/src/cache/dummycache.rs b/storage/src/cache/dummycache.rs index 3412a11e1c0..293b68a0938 100644 --- a/storage/src/cache/dummycache.rs +++ b/storage/src/cache/dummycache.rs @@ -222,6 +222,8 @@ impl BlobCacheMgr for DummyCacheMgr { validate: self.validate, })) } + + fn check_stat(&self) {} } impl Drop for DummyCacheMgr { diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index 7ac3f0acfef..c6c0c3065c4 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -27,6 +27,8 @@ use crate::meta::BlobMetaInfo; #[derive(Clone)] pub struct FileCacheMgr { blobs: Arc>>>, + #[allow(unused)] + blobs_need: usize, backend: Arc, metrics: Arc, prefetch_config: Arc, @@ -46,6 +48,7 @@ impl FileCacheMgr { backend: Arc, runtime: Arc, id: &str, + blobs_need: usize, ) -> Result { let blob_config: FileCacheConfig = serde_json::from_value(config.cache_config).map_err(|e| einval!(e))?; @@ -56,6 +59,7 @@ impl FileCacheMgr { Ok(FileCacheMgr { blobs: Arc::new(RwLock::new(HashMap::new())), + blobs_need, backend, metrics, prefetch_config, @@ -153,6 +157,8 @@ impl BlobCacheMgr for FileCacheMgr { self.get_or_create_cache_entry(blob_info) .map(|v| v as Arc) } + + fn check_stat(&self) {} } impl Drop for FileCacheMgr { diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index 379a4daa123..deb0ffcb619 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -17,7 +17,8 @@ use crate::cache::cachedfile::FileCacheEntry; use crate::cache::state::{BlobStateMap, IndexedChunkMap}; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; -use crate::device::{BlobFeatures, BlobInfo}; +use crate::device::{BlobFeatures, BlobInfo, BlobObject}; +use crate::factory::BLOB_FACTORY; use crate::meta::BlobMetaInfo; /// An implementation of [BlobCacheMgr](../trait.BlobCacheMgr.html) to improve performance by @@ -25,6 +26,7 @@ use crate::meta::BlobMetaInfo; #[derive(Clone)] pub struct FsCacheMgr { blobs: Arc>>>, + blobs_need: usize, backend: Arc, metrics: Arc, prefetch_config: Arc, @@ -42,6 +44,7 @@ impl FsCacheMgr { backend: Arc, runtime: Arc, id: &str, + blobs_need: usize, ) -> Result { let blob_config: FsCacheConfig = serde_json::from_value(config.cache_config).map_err(|e| einval!(e))?; @@ -50,8 +53,10 @@ impl FsCacheMgr { let prefetch_config: Arc = Arc::new(config.prefetch_config.into()); let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?; + BLOB_FACTORY.start_mgr_checker(); Ok(FsCacheMgr { blobs: Arc::new(RwLock::new(HashMap::new())), + blobs_need, backend, metrics, prefetch_config, @@ -146,6 +151,31 @@ impl BlobCacheMgr for FsCacheMgr { self.get_or_create_cache_entry(blob_info) .map(|v| v as Arc) } + + fn check_stat(&self) { + let guard = self.blobs.read().unwrap(); + if guard.len() != self.blobs_need { + info!( + "blob mgr not ready to check stat, need blobs {} have blobs {}", + self.blobs_need, + guard.len() + ); + return; + } + + let mut all_ready = true; + for (_id, entry) in guard.iter() { + if !entry.is_all_data_ready() { + all_ready = false; + break; + } + } + + if all_ready { + self.worker_mgr.stop(); + self.metrics.data_all_ready.store(true, Ordering::Release); + } + } } impl Drop for FsCacheMgr { diff --git a/storage/src/cache/mod.rs b/storage/src/cache/mod.rs index c75deeaa968..09f223ff421 100644 --- a/storage/src/cache/mod.rs +++ b/storage/src/cache/mod.rs @@ -350,6 +350,9 @@ pub(crate) trait BlobCacheMgr: Send + Sync { /// Get the blob cache to provide access to the `blob` object. fn get_blob_cache(&self, blob_info: &Arc) -> Result>; + + /// Check the blob cache data status, if data all ready stop prefetch workers. + fn check_stat(&self); } #[cfg(test)] diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index 45c073aa6cb..bb1bab9a1a7 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -5,10 +5,11 @@ use std::io::Result; use std::num::NonZeroU32; -use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; +use tokio::time::interval; use fuse_backend_rs::transport::FileVolatileSlice; use governor::clock::QuantaClock; @@ -22,6 +23,7 @@ use nydus_utils::async_helper::with_runtime; use nydus_utils::mpmc::Channel; use crate::cache::{BlobCache, BlobIoRange}; +use crate::factory::ASYNC_RUNTIME; use crate::RAFS_MAX_CHUNK_SIZE; /// Configuration information for asynchronous workers. @@ -85,6 +87,7 @@ pub(crate) struct AsyncWorkerMgr { metrics: Arc, ping_requests: AtomicU32, workers: AtomicU32, + active: AtomicBool, prefetch_channel: Arc>, prefetch_config: Arc, @@ -119,6 +122,7 @@ impl AsyncWorkerMgr { metrics, ping_requests: AtomicU32::new(0), workers: AtomicU32::new(0), + active: AtomicBool::new(false), prefetch_channel: Arc::new(Channel::new()), prefetch_config, @@ -139,6 +143,13 @@ impl AsyncWorkerMgr { /// Stop all working threads. pub fn stop(&self) { + if self + .active + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed) + .is_err() + { + return; + } self.prefetch_channel.close(); while self.workers.load(Ordering::Relaxed) > 0 { @@ -219,7 +230,7 @@ impl AsyncWorkerMgr { return Err(e); } } - + mgr.active.store(true, Ordering::Release); Ok(()) } @@ -235,6 +246,7 @@ impl AsyncWorkerMgr { blob_cache, offset, size, + state.clone(), )); } } @@ -296,10 +308,11 @@ impl AsyncWorkerMgr { } async fn handle_blob_prefetch_request( - _mgr: Arc, + mgr: Arc, cache: Arc, offset: u64, size: u64, + req_state: Arc, ) -> Result<()> { trace!( "storage: prefetch blob {} offset {} size {}", @@ -314,12 +327,24 @@ impl AsyncWorkerMgr { if let Some(obj) = cache.get_blob_object() { if let Err(e) = obj.fetch_range_compressed(offset, size) { warn!( - "storage: failed to prefetch data from blob {}, offset {}, size {}, {}", + "storage: failed to prefetch data from blob {}, offset {}, size {}, {}, will try resend", cache.blob_id(), offset, size, e ); + + ASYNC_RUNTIME.spawn(async move { + let mut interval = interval(Duration::from_secs(1)); + interval.tick().await; + let msg = AsyncPrefetchMessage::new_blob_prefetch( + req_state, + cache.clone(), + offset, + size, + ); + let _ = mgr.send_prefetch_message(msg); + }); } } else { // This is only supported by localfs backend to prefetch data into page cache, diff --git a/storage/src/device.rs b/storage/src/device.rs index ec1a5d450e5..bee1a4d66bf 100644 --- a/storage/src/device.rs +++ b/storage/src/device.rs @@ -833,7 +833,7 @@ impl BlobDevice { ) -> io::Result { let mut blobs = Vec::with_capacity(blob_infos.len()); for blob_info in blob_infos.iter() { - let blob = BLOB_FACTORY.new_blob_cache(config, blob_info)?; + let blob = BLOB_FACTORY.new_blob_cache(config, blob_info, blob_infos.len())?; blobs.push(blob); } @@ -858,7 +858,7 @@ impl BlobDevice { } let mut blobs = Vec::with_capacity(blob_infos.len()); for blob_info in blob_infos.iter() { - let blob = BLOB_FACTORY.new_blob_cache(config, blob_info)?; + let blob = BLOB_FACTORY.new_blob_cache(config, blob_info, blob_infos.len())?; blobs.push(blob); } diff --git a/storage/src/factory.rs b/storage/src/factory.rs index 00083d054d5..19015877767 100644 --- a/storage/src/factory.rs +++ b/storage/src/factory.rs @@ -13,13 +13,16 @@ use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::io::Result as IOResult; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use lazy_static::lazy_static; -use tokio::runtime::{Builder, Runtime}; - use nydus_api::http::{BackendConfig, FactoryConfig}; +use tokio::{ + runtime::{Builder, Runtime}, + time, +}; #[cfg(feature = "backend-localfs")] use crate::backend::localfs; @@ -32,12 +35,13 @@ use crate::cache::{BlobCache, BlobCacheMgr, DummyCacheMgr, FileCacheMgr, FsCache use crate::device::BlobInfo; lazy_static! { - static ref ASYNC_RUNTIME: Arc = { + pub static ref ASYNC_RUNTIME: Arc = { let runtime = Builder::new_multi_thread() .worker_threads(1) // Limit the number of worker thread to 1 since this runtime is generally used to do blocking IO. .thread_keep_alive(Duration::from_secs(10)) .max_blocking_threads(8) .thread_name("cache-flusher") + .enable_all() .build(); match runtime { Ok(v) => Arc::new(v), @@ -69,6 +73,7 @@ lazy_static::lazy_static! { /// Factory to create blob cache for blob objects. pub struct BlobFactory { mgrs: Mutex>>, + mgr_checker_active: AtomicBool, } impl BlobFactory { @@ -76,14 +81,33 @@ impl BlobFactory { pub fn new() -> Self { BlobFactory { mgrs: Mutex::new(HashMap::new()), + mgr_checker_active: AtomicBool::new(false), } } + pub fn start_mgr_checker(&self) { + if self + .mgr_checker_active + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_err() + { + return; + } + ASYNC_RUNTIME.spawn(async { + let mut interval = time::interval(Duration::from_secs(5)); + loop { + interval.tick().await; + BLOB_FACTORY.check_cache_stat(); + } + }); + } + /// Create a blob cache object for a blob with specified configuration. pub fn new_blob_cache( &self, config: &Arc, blob_info: &Arc, + blobs_need: usize, ) -> IOResult> { let key = BlobCacheMgrKey { config: config.clone(), @@ -92,7 +116,6 @@ impl BlobFactory { if let Some(mgr) = self.mgrs.lock().unwrap().get(&key) { return mgr.get_blob_cache(blob_info); } - let backend = Self::new_backend(key.config.backend.clone(), blob_info.blob_id())?; let mgr = match key.config.cache.cache_type.as_str() { "blobcache" => { @@ -101,6 +124,7 @@ impl BlobFactory { backend, ASYNC_RUNTIME.clone(), &config.id, + blobs_need, )?; mgr.init()?; Arc::new(mgr) as Arc @@ -111,6 +135,7 @@ impl BlobFactory { backend, ASYNC_RUNTIME.clone(), &config.id, + blobs_need, )?; mgr.init()?; Arc::new(mgr) as Arc @@ -191,6 +216,13 @@ impl BlobFactory { ))), } } + + fn check_cache_stat(&self) { + let mgrs = self.mgrs.lock().unwrap(); + for (_key, mgr) in mgrs.iter() { + mgr.check_stat(); + } + } } impl Default for BlobFactory { diff --git a/utils/src/metrics.rs b/utils/src/metrics.rs index c9cae5a8aad..40b32556062 100644 --- a/utils/src/metrics.rs +++ b/utils/src/metrics.rs @@ -737,6 +737,7 @@ pub struct BlobcacheMetrics { pub prefetch_workers: AtomicUsize, pub prefetch_unmerged_chunks: BasicMetric, pub buffered_backend_size: BasicMetric, + pub data_all_ready: AtomicBool, } impl BlobcacheMetrics {