Skip to content

Commit

Permalink
nydusd: make copen always return right file size
Browse files Browse the repository at this point in the history
If we fail to create a blob object and return an error in the copen cmd
for blob fscache cookie , the erofs will be mounted but the blob data
can not be accessed. The following requests will not be sent to the user
daemon , then it has no chance to handle this case.
 
Make copen always return the right file size , then the following
requests may be dispatched to user daemon, we can do some retry later.

For now, use an asynchronous thread to create blob object on open cmd,
and will wait for the result before first ondemand read.

Signed-off-by: Xin Yin <[email protected]>
  • Loading branch information
Xin Yin committed Jul 25, 2022
1 parent 3586677 commit 82f6712
Showing 1 changed file with 95 additions and 45 deletions.
140 changes: 95 additions & 45 deletions src/bin/nydusd/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::ptr::read_unaligned;
use std::string::String;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier, Mutex, MutexGuard};
use std::sync::{Arc, Barrier, Mutex, MutexGuard, RwLock};
use std::{thread, time};

use mio::unix::SourceFd;
Expand All @@ -40,6 +40,9 @@ const MSG_READ_SIZE: usize = 16;
const TOKEN_EVENT_WAKER: usize = 1;
const TOKEN_EVENT_FSCACHE: usize = 2;

const BLOBCACHE_INIT_RETRY: u8 = 5;
const BLOBCACHE_INIT_INTERVAL_MS: u64 = 300;

/// Command code in requests from fscache driver.
#[repr(u32)]
#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -203,10 +206,27 @@ struct FsCacheBootStrap {
cache_file: File,
}

struct FsCacheBlobCache {
cache: Option<Arc<dyn BlobCache>>,
config: Arc<BlobCacheConfigDataBlob>,
file: Arc<File>,
blobs_need: usize,
}

impl FsCacheBlobCache {
fn set_blobcache(&mut self, cache: Option<Arc<dyn BlobCache>>) {
self.cache = cache;
}

fn get_blobcache(&self) -> Option<Arc<dyn BlobCache>> {
self.cache.clone()
}
}

#[derive(Clone)]
enum FsCacheObject {
Bootstrap(Arc<FsCacheBootStrap>),
DataBlob(Arc<dyn BlobCache>),
DataBlob(Arc<RwLock<FsCacheBlobCache>>),
}

/// Struct to maintain cached file objects.
Expand Down Expand Up @@ -433,22 +453,49 @@ impl FsCacheHandler {
let mut state = self.state.lock().unwrap();
let blobs_need = state.blob_cache_mgr.get_blobs_num(domain_id);
if let Vacant(e) = state.id_to_object_map.entry(hdr.object_id) {
match self.create_data_blob_object(&config, msg.fd, blobs_need) {
Err(s) => format!("copen {},{}", hdr.msg_id, s),
Ok((blob, blob_size)) => {
e.insert((FsCacheObject::DataBlob(blob.clone()), msg.fd));
state.id_to_config_map.insert(hdr.object_id, config.clone());
let _ = self.do_prefetch(&config, blob);
format!("copen {},{}", hdr.msg_id, blob_size)
}
}
let fsblob = Arc::new(RwLock::new(FsCacheBlobCache {
cache: None,
config: config.clone(),
file: Arc::new(unsafe { File::from_raw_fd(msg.fd as RawFd) }),
blobs_need,
}));
e.insert((FsCacheObject::DataBlob(fsblob.clone()), msg.fd));
state.id_to_config_map.insert(hdr.object_id, config.clone());
let blob_size = config.blob_info().deref().uncompressed_size();
Self::init_blobcache(fsblob);
format!("copen {},{}", hdr.msg_id, blob_size)
} else {
unsafe { libc::close(msg.fd as i32) };
format!("copen {},{}", hdr.msg_id, -libc::EALREADY)
}
}

fn do_prefetch(&self, config: &BlobCacheConfigDataBlob, blob: Arc<dyn BlobCache>) {
fn init_blobcache(fsblob: Arc<RwLock<FsCacheBlobCache>>) {
thread::spawn(move || {
let mut guard = fsblob.write().unwrap();
//for now FsCacheBlobCache only init once, should not have blobcache associated with it
assert!(guard.get_blobcache().is_none());
for _ in 0..BLOBCACHE_INIT_RETRY {
match Self::create_data_blob_object(
&guard.config,
guard.file.clone(),
guard.blobs_need,
) {
Err(e) => {
warn!("fscache: create_data_blob_object failed {}", e);
thread::sleep(time::Duration::from_millis(BLOBCACHE_INIT_INTERVAL_MS));
}
Ok(blob) => {
guard.set_blobcache(Some(blob.clone()));
Self::do_prefetch(&guard.config, blob);
break;
}
}
}
});
}

fn do_prefetch(config: &BlobCacheConfigDataBlob, blob: Arc<dyn BlobCache>) {
let blob_info = config.blob_info().deref();
let factory_config = config.factory_config().deref();
if !factory_config.cache.prefetch_config.enable {
Expand Down Expand Up @@ -496,26 +543,14 @@ impl FsCacheHandler {
/// the chunk map file will be managed by the userspace daemon. We need to figure out the
/// way to share blob/chunkamp files with filecache manager.
fn create_data_blob_object(
&self,
config: &BlobCacheConfigDataBlob,
fd: u32,
file: Arc<File>,
blobs_need: usize,
) -> std::result::Result<(Arc<dyn BlobCache>, u64), i32> {
) -> Result<Arc<dyn BlobCache>> {
let mut blob_info = config.blob_info().deref().clone();
// `BlobInfo` from the configuration cache should not have fscache file associated with it.
assert!(blob_info.get_fscache_file().is_none());

// Safe because we trust the kernel fscache driver.
let file = unsafe { File::from_raw_fd(fd as RawFd) };
blob_info.set_fscache_file(Some(Arc::new(file)));
blob_info.set_fscache_file(Some(file));
let blob_ref = Arc::new(blob_info);
match BLOB_FACTORY.new_blob_cache(config.factory_config(), &blob_ref, blobs_need) {
Err(_e) => Err(-libc::ENOENT),
Ok(blob) => match blob.blob_uncompressed_size() {
Err(_e) => Err(-libc::EIO),
Ok(v) => Ok((blob, v)),
},
}
BLOB_FACTORY.new_blob_cache(config.factory_config(), &blob_ref, blobs_need)
}

fn fill_bootstrap_cache(bootstrap_fd: RawFd, cachefile_fd: RawFd, size: usize) -> Result<()> {
Expand Down Expand Up @@ -615,19 +650,25 @@ impl FsCacheHandler {
fn handle_close_request(&self, hdr: &FsCacheMsgHeader) {
let mut state = self.get_state();

if let Some((FsCacheObject::DataBlob(blob), _)) =
if let Some((FsCacheObject::DataBlob(fsblob), _)) =
state.id_to_object_map.remove(&hdr.object_id)
{
// Safe to unwrap() because `id_to_config_map` and `id_to_object_map` is kept
// in consistence.
let config = state.id_to_config_map.remove(&hdr.object_id).unwrap();
let factory_config = config.factory_config();
if factory_config.cache.prefetch_config.enable {
let _ = blob.stop_prefetch();
let guard = fsblob.read().unwrap();
match guard.get_blobcache() {
Some(blob) => {
if factory_config.cache.prefetch_config.enable {
let _ = blob.stop_prefetch();
}
let id = blob.blob_id().to_string();
drop(blob);
BLOB_FACTORY.gc(Some((factory_config, &id)));
}
_ => warn!("fscache: blob object not ready"),
}
let id = blob.blob_id().to_string();
drop(blob);
BLOB_FACTORY.gc(Some((factory_config, &id)));
}
}

Expand All @@ -642,19 +683,28 @@ impl FsCacheHandler {
);
return;
}
Some((FsCacheObject::DataBlob(blob), u)) => {
Some((FsCacheObject::DataBlob(fsblob), u)) => {
fd = u;
match blob.get_blob_object() {
None => {
warn!("fscache: internal error: cached object is not BlobCache objects");
let guard = fsblob.read().unwrap();
match guard.get_blobcache() {
Some(blob) => {
match blob.get_blob_object() {
None => {
warn!("fscache: internal error: cached object is not BlobCache objects");
}
Some(obj) => match obj.fetch_range_uncompressed(msg.off, msg.len) {
Ok(_) => {}
Err(e) => error!(
"{}",
format!("fscache: failed to read data from blob object: {}", e,)
),
},
}
}
_ => {
warn!("fscache: blob object not ready")
//TODO: maybe we should retry init blob object here
}
Some(obj) => match obj.fetch_range_uncompressed(msg.off, msg.len) {
Ok(_) => {}
Err(e) => error!(
"{}",
format!("fscache: failed to read data from blob object: {}", e,)
),
},
}
}
Some((FsCacheObject::Bootstrap(bs), u)) => {
Expand Down

0 comments on commit 82f6712

Please sign in to comment.