Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prepare for nydusd daemonless #540

Merged
merged 5 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/bin/nydusd/blob_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,22 @@ impl BlobCacheState {
fn get(&self, key: &str) -> Option<BlobCacheObjectConfig> {
self.id_to_config_map.get(key).cloned()
}

/// get DataBlob number for a domain_id
fn get_blobs_num(&self, domain_id: &str) -> usize {
let scoped_blob_prefix = format!("{}{}", domain_id, ID_SPLITTER);
return self
.id_to_config_map
.values()
.filter(|v| {
if let BlobCacheObjectConfig::DataBlob(o) = v {
o.scoped_blob_id.starts_with(&scoped_blob_prefix)
} else {
false
}
})
.count();
}
}

/// Manager for cached file objects.
Expand Down Expand Up @@ -266,6 +282,10 @@ impl BlobCacheMgr {
self.get_state().get(key)
}

pub fn get_blobs_num(&self, domain_id: &str) -> usize {
self.get_state().get_blobs_num(domain_id)
}

#[inline]
fn get_state(&self) -> MutexGuard<BlobCacheState> {
self.state.lock().unwrap()
Expand Down
60 changes: 55 additions & 5 deletions src/bin/nydusd/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ use std::ptr::read_unaligned;
use std::string::String;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier, Mutex, MutexGuard};
use std::{thread, time};

use mio::unix::SourceFd;
use mio::{Events, Interest, Poll, Token, Waker};
use storage::cache::BlobCache;
use storage::device::BlobPrefetchRequest;
use storage::factory::BLOB_FACTORY;
use storage::factory::{ASYNC_RUNTIME, BLOB_FACTORY};

use crate::blob_cache::{
generate_blob_key, BlobCacheConfigBootstrap, BlobCacheConfigDataBlob, BlobCacheMgr,
Expand Down Expand Up @@ -407,7 +408,7 @@ impl FsCacheHandler {
}
Some(cfg) => match cfg {
BlobCacheObjectConfig::DataBlob(config) => {
self.handle_open_data_blob(hdr, msg, config)
self.handle_open_data_blob(hdr, msg, config, domain_id)
}
BlobCacheObjectConfig::Bootstrap(config) => {
self.handle_open_bootstrap(hdr, msg, config)
Expand All @@ -422,10 +423,12 @@ impl FsCacheHandler {
hdr: &FsCacheMsgHeader,
msg: &FsCacheMsgOpen,
config: Arc<BlobCacheConfigDataBlob>,
domain_id: &str,
) -> String {
let mut state = self.state.lock().unwrap();
let blobs_need = state.blob_cache_mgr.get_blobs_num(domain_id);
if let Vacant(e) = state.id_to_object_map.entry(hdr.object_id) {
match self.create_data_blob_object(&config, msg.fd) {
match self.create_data_blob_object(&config, msg.fd, blobs_need) {
Err(s) => format!("copen {},{}", hdr.msg_id, s),
Ok((blob, blob_size)) => {
e.insert((FsCacheObject::DataBlob(blob.clone()), msg.fd));
Expand Down Expand Up @@ -491,6 +494,7 @@ impl FsCacheHandler {
&self,
config: &BlobCacheConfigDataBlob,
fd: u32,
blobs_need: usize,
) -> std::result::Result<(Arc<dyn BlobCache>, u64), i32> {
let mut blob_info = config.blob_info().deref().clone();
// `BlobInfo` from the configuration cache should not have fscache file associated with it.
Expand All @@ -500,8 +504,7 @@ impl FsCacheHandler {
let file = unsafe { File::from_raw_fd(fd as RawFd) };
blob_info.set_fscache_file(Some(Arc::new(file)));
let blob_ref = Arc::new(blob_info);

match BLOB_FACTORY.new_blob_cache(config.factory_config(), &blob_ref) {
match BLOB_FACTORY.new_blob_cache(config.factory_config(), &blob_ref, blobs_need) {
Err(_e) => Err(-libc::ENOENT),
Ok(blob) => match blob.blob_uncompressed_size() {
Err(_e) => Err(-libc::EIO),
Expand All @@ -510,6 +513,38 @@ impl FsCacheHandler {
}
}

fn fill_bootstrap_cache(bootstrap_fd: RawFd, cachefile_fd: RawFd, size: usize) -> Result<()> {
let base = unsafe {
libc::mmap(
std::ptr::null_mut(),
size,
libc::PROT_READ,
libc::MAP_SHARED,
bootstrap_fd,
0_i64,
)
};
if base == libc::MAP_FAILED {
warn!(
"fscache: failed to mmap bootstrap file, {}",
std::io::Error::last_os_error()
);
return Err(eio!("fscache: fill bootstrap cachefile error"));
}

let ret = unsafe { libc::pwrite(cachefile_fd, base, size, 0) };
let _ = unsafe { libc::munmap(base, size) };

if ret != size as isize {
warn!(
"fscache: failed to write bootstrap blob data to cached file, {}",
std::io::Error::last_os_error()
);
return Err(eio!("fscache: fill bootstrap cachefile error"));
}
Ok(())
}

fn handle_open_bootstrap(
&self,
hdr: &FsCacheMsgHeader,
Expand Down Expand Up @@ -538,11 +573,26 @@ impl FsCacheHandler {
}
Ok(md) => {
let cache_file = unsafe { File::from_raw_fd(msg.fd as RawFd) };
let bootstrap_fd = f.as_raw_fd();
let cachefile_fd = cache_file.as_raw_fd();
let object = FsCacheObject::Bootstrap(Arc::new(FsCacheBootStrap {
bootstrap_file: f,
cache_file,
}));
e.insert((object, msg.fd));
let len = md.len() as usize;
ASYNC_RUNTIME.spawn_blocking(move || {
//add slight delay to let copen reply first
thread::sleep(time::Duration::from_millis(10));
changweige marked this conversation as resolved.
Show resolved Hide resolved
for _i in 0..3 {
if Self::fill_bootstrap_cache(bootstrap_fd, cachefile_fd, len)
.is_ok()
{
changweige marked this conversation as resolved.
Show resolved Hide resolved
break;
}
thread::sleep(time::Duration::from_secs(2));
changweige marked this conversation as resolved.
Show resolved Hide resolved
}
});
md.len() as i64
}
},
Expand Down
2 changes: 2 additions & 0 deletions storage/src/cache/dummycache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ impl BlobCacheMgr for DummyCacheMgr {
validate: self.validate,
}))
}

fn check_stat(&self) {}
}

impl Drop for DummyCacheMgr {
Expand Down
6 changes: 6 additions & 0 deletions storage/src/cache/filecache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::meta::BlobMetaInfo;
#[derive(Clone)]
pub struct FileCacheMgr {
blobs: Arc<RwLock<HashMap<String, Arc<FileCacheEntry>>>>,
#[allow(unused)]
blobs_need: usize,
backend: Arc<dyn BlobBackend>,
metrics: Arc<BlobcacheMetrics>,
prefetch_config: Arc<AsyncPrefetchConfig>,
Expand All @@ -46,6 +48,7 @@ impl FileCacheMgr {
backend: Arc<dyn BlobBackend>,
runtime: Arc<Runtime>,
id: &str,
blobs_need: usize,
) -> Result<FileCacheMgr> {
let blob_config: FileCacheConfig =
serde_json::from_value(config.cache_config).map_err(|e| einval!(e))?;
Expand All @@ -56,6 +59,7 @@ impl FileCacheMgr {

Ok(FileCacheMgr {
blobs: Arc::new(RwLock::new(HashMap::new())),
blobs_need,
backend,
metrics,
prefetch_config,
Expand Down Expand Up @@ -153,6 +157,8 @@ impl BlobCacheMgr for FileCacheMgr {
self.get_or_create_cache_entry(blob_info)
.map(|v| v as Arc<dyn BlobCache>)
}

fn check_stat(&self) {}
}

impl Drop for FileCacheMgr {
Expand Down
32 changes: 31 additions & 1 deletion storage/src/cache/fscache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ use crate::cache::cachedfile::FileCacheEntry;
use crate::cache::state::{BlobStateMap, IndexedChunkMap};
use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
use crate::cache::{BlobCache, BlobCacheMgr};
use crate::device::{BlobFeatures, BlobInfo};
use crate::device::{BlobFeatures, BlobInfo, BlobObject};
use crate::factory::BLOB_FACTORY;
use crate::meta::BlobMetaInfo;

/// An implementation of [BlobCacheMgr](../trait.BlobCacheMgr.html) to improve performance by
/// caching uncompressed blob with Linux fscache subsystem.
#[derive(Clone)]
pub struct FsCacheMgr {
blobs: Arc<RwLock<HashMap<String, Arc<FileCacheEntry>>>>,
blobs_need: usize,
backend: Arc<dyn BlobBackend>,
metrics: Arc<BlobcacheMetrics>,
prefetch_config: Arc<AsyncPrefetchConfig>,
Expand All @@ -42,6 +44,7 @@ impl FsCacheMgr {
backend: Arc<dyn BlobBackend>,
runtime: Arc<Runtime>,
id: &str,
blobs_need: usize,
) -> Result<FsCacheMgr> {
let blob_config: FsCacheConfig =
serde_json::from_value(config.cache_config).map_err(|e| einval!(e))?;
Expand All @@ -50,8 +53,10 @@ impl FsCacheMgr {
let prefetch_config: Arc<AsyncPrefetchConfig> = Arc::new(config.prefetch_config.into());
let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?;

BLOB_FACTORY.start_mgr_checker();
Ok(FsCacheMgr {
blobs: Arc::new(RwLock::new(HashMap::new())),
blobs_need,
backend,
metrics,
prefetch_config,
Expand Down Expand Up @@ -146,6 +151,31 @@ impl BlobCacheMgr for FsCacheMgr {
self.get_or_create_cache_entry(blob_info)
.map(|v| v as Arc<dyn BlobCache>)
}

fn check_stat(&self) {
let guard = self.blobs.read().unwrap();
if guard.len() != self.blobs_need {
info!(
"blob mgr not ready to check stat, need blobs {} have blobs {}",
self.blobs_need,
guard.len()
);
return;
}

let mut all_ready = true;
for (_id, entry) in guard.iter() {
if !entry.is_all_data_ready() {
all_ready = false;
break;
}
}

if all_ready {
self.worker_mgr.stop();
self.metrics.data_all_ready.store(true, Ordering::Release);
changweige marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

impl Drop for FsCacheMgr {
Expand Down
3 changes: 3 additions & 0 deletions storage/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ pub(crate) trait BlobCacheMgr: Send + Sync {

/// Get the blob cache to provide access to the `blob` object.
fn get_blob_cache(&self, blob_info: &Arc<BlobInfo>) -> Result<Arc<dyn BlobCache>>;

/// Check the blob cache data status, if data all ready stop prefetch workers.
fn check_stat(&self);
}

#[cfg(test)]
Expand Down
33 changes: 29 additions & 4 deletions storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

use std::io::Result;
use std::num::NonZeroU32;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::time::interval;

use fuse_backend_rs::transport::FileVolatileSlice;
use governor::clock::QuantaClock;
Expand All @@ -22,6 +23,7 @@ use nydus_utils::async_helper::with_runtime;
use nydus_utils::mpmc::Channel;

use crate::cache::{BlobCache, BlobIoRange};
use crate::factory::ASYNC_RUNTIME;
use crate::RAFS_MAX_CHUNK_SIZE;

/// Configuration information for asynchronous workers.
Expand Down Expand Up @@ -85,6 +87,7 @@ pub(crate) struct AsyncWorkerMgr {
metrics: Arc<BlobcacheMetrics>,
ping_requests: AtomicU32,
workers: AtomicU32,
active: AtomicBool,

prefetch_channel: Arc<Channel<AsyncPrefetchMessage>>,
prefetch_config: Arc<AsyncPrefetchConfig>,
Expand Down Expand Up @@ -119,6 +122,7 @@ impl AsyncWorkerMgr {
metrics,
ping_requests: AtomicU32::new(0),
workers: AtomicU32::new(0),
active: AtomicBool::new(false),

prefetch_channel: Arc::new(Channel::new()),
prefetch_config,
Expand All @@ -139,6 +143,13 @@ impl AsyncWorkerMgr {

/// Stop all working threads.
pub fn stop(&self) {
if self
.active
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems stop can't be re-entered without the check if it is active? So we previously had a bug here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, only stop workers when delete image, so stop() will not be re-entered. Anyway we should add active stat to handle the re-enter case now.

}
self.prefetch_channel.close();

while self.workers.load(Ordering::Relaxed) > 0 {
Expand Down Expand Up @@ -219,7 +230,7 @@ impl AsyncWorkerMgr {
return Err(e);
}
}

mgr.active.store(true, Ordering::Release);
Ok(())
}

Expand All @@ -235,6 +246,7 @@ impl AsyncWorkerMgr {
blob_cache,
offset,
size,
state.clone(),
));
}
}
Expand Down Expand Up @@ -296,10 +308,11 @@ impl AsyncWorkerMgr {
}

async fn handle_blob_prefetch_request(
_mgr: Arc<AsyncWorkerMgr>,
mgr: Arc<AsyncWorkerMgr>,
cache: Arc<dyn BlobCache>,
offset: u64,
size: u64,
req_state: Arc<AtomicU32>,
) -> Result<()> {
trace!(
"storage: prefetch blob {} offset {} size {}",
Expand All @@ -314,12 +327,24 @@ impl AsyncWorkerMgr {
if let Some(obj) = cache.get_blob_object() {
if let Err(e) = obj.fetch_range_compressed(offset, size) {
warn!(
"storage: failed to prefetch data from blob {}, offset {}, size {}, {}",
"storage: failed to prefetch data from blob {}, offset {}, size {}, {}, will try resend",
cache.blob_id(),
offset,
size,
e
);

ASYNC_RUNTIME.spawn(async move {
let mut interval = interval(Duration::from_secs(1));
interval.tick().await;
let msg = AsyncPrefetchMessage::new_blob_prefetch(
req_state,
cache.clone(),
offset,
size,
);
let _ = mgr.send_prefetch_message(msg);
});
}
} else {
// This is only supported by localfs backend to prefetch data into page cache,
Expand Down
Loading