From ba49c18fa69da2769d44cb2b2d91359beaf4f558 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Wed, 1 Jun 2022 10:25:24 -0700 Subject: [PATCH 01/10] zoa: determine response type automatically (#446) Since the `response_type` is the same as the `request_type`, the server infrastructure can fill it in automatically, so each request handler doesn't need to match it up manually. The "get pools" response contained the pools directly in the nvlist, and asserted that there are no other nvpairs. There was no `response_type` in this response message. This commit changes it so that "get pools" includes a `response_type`, and the pools are under a different `pools` nvpair. Additionally, the code in `PublicConnectionState::get_pools()` is reorganized to be more clear. Additionally, the code in `PublicConnectionState::get_destroying_pools()` is simplified, using the server helper method to convert to an nvlist. --- .../zettaobject/src/pool_destroy.rs | 21 +- .../zettaobject/src/public_connection.rs | 191 +++++++++--------- .../zettaobject/src/root_connection.rs | 45 +---- .../zettaobject/src/server.rs | 42 ++-- lib/libzutil/zutil_import.c | 4 +- 5 files changed, 135 insertions(+), 168 deletions(-) diff --git a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs index 190eee6b6318..ecb75186968d 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs @@ -11,7 +11,6 @@ use anyhow::Result; use futures::stream::StreamExt; use lazy_static::lazy_static; use log::*; -use nvpair::NvList; use serde::Deserialize; use serde::Serialize; use tokio::fs; @@ -64,8 +63,8 @@ struct DestroyingCachePhys { pools: HashMap, } -#[derive(Serialize, Debug)] -struct DestroyingPool { +#[derive(Serialize, Debug, Clone)] +pub struct DestroyingPool { #[serde(flatten)] cache_phys: DestroyingCacheItemPhys, #[serde(flatten)] @@ -137,17 +136,6 @@ impl DestroyingPoolsMap { }); } } - - fn to_nvlist(&self) -> NvList { - let mut nvl = NvList::new_unique_names(); - - for (guid, destroying_pool) in self.pools.iter() { - let nvl_item = nvpair::to_nvlist(destroying_pool).unwrap(); - nvl.insert(format!("{}", guid), nvl_item.as_ref()).unwrap(); - } - - nvl - } } #[derive(Default, Debug)] @@ -438,7 +426,7 @@ pub async fn resume_destroy(object_access: Arc, guid: PoolGuid) -> /// Retrieve the PoolDestroyer's list of pools that are either being destroyed or have been /// destroyed. -pub async fn get_destroy_list() -> NvList { +pub async fn get_destroy_list() -> HashMap { maybe_die_with(|| "in get_destroy_list"); let maybe_pool_destroyer = POOL_DESTROYER.lock().await; @@ -446,7 +434,8 @@ pub async fn get_destroy_list() -> NvList { .as_ref() .unwrap() .destroying_pools_map - .to_nvlist() + .pools + .clone() } /// Remove pools that have been successfully destroyed from the PoolDestroyer's list of pools. diff --git a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs index 30a8da0d72b5..d8a591f1e004 100644 --- a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs @@ -1,8 +1,8 @@ +use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use std::sync::Mutex; -use anyhow::Result; use futures::stream::StreamExt; use log::*; use nvpair::NvList; @@ -21,6 +21,7 @@ use crate::object_access::ObjectAccess; use crate::object_access::ObjectAccessProtocol; use crate::pool::*; use crate::pool_destroy; +use crate::pool_destroy::DestroyingPool; use crate::server::return_ok; use crate::server::ConnectionState; use crate::server::HandlerReturn; @@ -90,102 +91,57 @@ impl PublicConnectionState { fn get_pools(&mut self, nvl: NvList) -> HandlerReturn { info!("got request: {:?}", nvl); - Ok(Box::pin(async move { Self::get_pools_impl(nvl).await })) - } - - async fn get_pools_impl(nvl: NvList) -> Result> { - #[derive(Debug, Deserialize)] - struct GetPoolsRequest { - #[serde(flatten)] - protocol: ObjectAccessProtocol, - #[serde(default)] - readonly: bool, - bucket: Option, - guid: Option, - } - let request: GetPoolsRequest = nvpair::from_nvlist(&nvl)?; - - let bucket_access = BucketAccess::new(request.protocol.clone()).await?; + Ok(Box::pin(async move { + #[derive(Debug, Deserialize)] + struct GetPoolsRequest { + #[serde(flatten)] + protocol: ObjectAccessProtocol, + bucket: Option, + guid: Option, + } + let request: GetPoolsRequest = nvpair::from_nvlist(&nvl)?; + let bucket_access = BucketAccess::new(request.protocol.clone()).await?; + let buckets = if let Some(bucket) = request.bucket { + vec![bucket] + } else { + bucket_access.list_buckets().await + }; - let mut buckets = vec![]; - if let Some(bucket) = request.bucket { - buckets.push(bucket); - } else { - buckets.append(&mut bucket_access.list_buckets().await); - } + maybe_die_with(|| "in get_pools_impl"); - maybe_die_with(|| "in get_pools_impl"); - let response = Arc::new(Mutex::new(NvList::new_unique_names())); - for buck in buckets { - let object_access = - ObjectAccess::new(request.protocol.clone(), buck, request.readonly).await?; - if let Some(guid) = request.guid { - if !Pool::exists(&object_access, PoolGuid(guid)).await { - continue; - } - - match Pool::get_config(&object_access, PoolGuid(guid)).await { - Ok(pool_config) => { - let mut owned_response = - Arc::try_unwrap(response).unwrap().into_inner().unwrap(); - owned_response - .insert(format!("{}", guid), pool_config.as_ref()) - .unwrap(); - debug!("sending response: {:?}", owned_response); - return Ok(Some(owned_response)); - } - Err(e) => { - error!("skipping {:?}: {:?}", guid, e); - continue; - } + let pools = Arc::new(Mutex::new(NvList::new_unique_names())); + for buck in buckets { + let object_access = ObjectAccess::new(request.protocol.clone(), buck, true).await?; + if let Some(guid) = request.guid { + find_pool(object_access, PoolGuid(guid), pools.clone()).await; + } else { + discover_pools(object_access, pools.clone()).await; } } - - object_access - .list_prefixes("zfs/".to_string()) - .for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| { - let my_object_access = object_access.clone(); - let my_response = response.clone(); - async move { - debug!("prefix: {}", prefix); - let split: Vec<&str> = prefix.rsplitn(3, '/').collect(); - let guid_str = split[1]; - if let Ok(guid64) = str::parse::(guid_str) { - let guid = PoolGuid(guid64); - match Pool::get_config(&my_object_access, guid).await { - Ok(pool_config) => my_response - .lock() - .unwrap() - .insert(guid_str, pool_config.as_ref()) - .unwrap(), - Err(e) => { - error!("skipping {:?}: {:?}", guid, e); - } - } - } - } - }) - .await; - } - let owned_response = Arc::try_unwrap(response).unwrap().into_inner().unwrap(); - info!("sending response: {:?}", owned_response); - Ok(Some(owned_response)) + let pools = Arc::try_unwrap(pools).unwrap().into_inner().unwrap(); + let mut response = NvList::new_unique_names(); + response.insert("pools", pools.as_ref())?; + info!("sending response: {response:?}"); + Ok(Some(response)) + })) } fn get_destroying_pools(&mut self, nvl: NvList) -> HandlerReturn { Ok(Box::pin(async move { - // XXX convert to use serde nvlist response debug!("got request: {:?}", nvl); - let pools = pool_destroy::get_destroy_list().await; - let mut response = NvList::new_unique_names(); - response - .insert(AGENT_RESPONSE_TYPE, TYPE_GET_DESTROYING_POOLS) - .unwrap(); - response.insert("pools", pools.as_ref()).unwrap(); - - debug!("sending response: {:?}", response); - Ok(Some(response)) + #[derive(Debug, Serialize)] + struct DestroyingPoolsResponse { + pools: HashMap, + } + let response = DestroyingPoolsResponse { + pools: pool_destroy::get_destroy_list() + .await + .into_iter() + .map(|(guid, destroying)| (guid.to_string(), destroying)) + .collect(), + }; + return_ok(response, true) })) } @@ -195,13 +151,7 @@ impl PublicConnectionState { debug!("got request: {:?}", nvl); pool_destroy::remove_not_in_progress().await; - let mut response = NvList::new_unique_names(); - response - .insert(AGENT_RESPONSE_TYPE, TYPE_CLEAR_DESTROYED_POOLS) - .unwrap(); - - debug!("sending response: {:?}", response); - Ok(Some(response)) + Ok(Some(NvList::new_unique_names())) })) } @@ -225,7 +175,7 @@ impl PublicConnectionState { bucket_size: phys.bucket_size, combined_histogram, }; - return_ok(TYPE_REPORT_HITS, response, true) + return_ok(response, true) })) } @@ -240,7 +190,7 @@ impl PublicConnectionState { let response = ListDevicesResponse { devices_json: serde_json::to_string(&cache.devices()).unwrap(), }; - return_ok(TYPE_LIST_DEVICES, response, true) + return_ok(response, true) })) } @@ -255,7 +205,7 @@ impl PublicConnectionState { let response = ZcacheIostatResponse { iostats_json: serde_json::to_string(&cache.io_stats()).unwrap(), }; - return_ok(TYPE_ZCACHE_IOSTAT, response, false) + return_ok(response, false) })) } @@ -270,7 +220,52 @@ impl PublicConnectionState { let response = ZcacheStatsResponse { stats_json: serde_json::to_string(&cache.stats()).unwrap(), }; - return_ok(TYPE_ZCACHE_STATS, response, false) + return_ok(response, false) })) } } + +async fn discover_pools(object_access: Arc, pools: Arc>) { + object_access + .list_prefixes("zfs/".to_string()) + .for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| { + let my_object_access = object_access.clone(); + let pools = pools.clone(); + async move { + debug!("prefix: {}", prefix); + let split = prefix.rsplitn(3, '/').collect::>(); + let guid_str = split[1]; + if let Ok(guid64) = str::parse::(guid_str) { + let guid = PoolGuid(guid64); + match Pool::get_config(&my_object_access, guid).await { + Ok(pool_config) => pools + .lock() + .unwrap() + .insert(guid_str, pool_config.as_ref()) + .unwrap(), + Err(e) => { + error!("skipping {guid:?}: {e:?}"); + } + } + } + } + }) + .await; +} + +async fn find_pool(object_access: Arc, guid: PoolGuid, pools: Arc>) { + if Pool::exists(&object_access, guid).await { + match Pool::get_config(&object_access, guid).await { + Ok(pool_config) => { + pools + .lock() + .unwrap() + .insert(format!("{}", guid), pool_config.as_ref()) + .unwrap(); + } + Err(e) => { + error!("skipping {guid:?}: {e:?}"); + } + } + } +} diff --git a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs index c5e6fec358b2..906d2bcb5721 100644 --- a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs @@ -150,7 +150,7 @@ impl RootConnectionState { .await .map_err(|e| FailureMessage::new(e.into())); - return_result(TYPE_CREATE_POOL, request.id, result, true) + return_result(request.id, result, true) }) } @@ -269,7 +269,7 @@ impl RootConnectionState { }) } }; - return_result(TYPE_OPEN_POOL, request.id, result, true) + return_result(request.id, result, true) }) } @@ -349,13 +349,11 @@ impl RootConnectionState { .await; #[derive(Debug, Serialize)] struct EndTxgResponse { - response_type: &'static str, #[serde(flatten)] stats: PoolStatsPhys, features: HashMap, } let response = EndTxgResponse { - response_type: TYPE_END_TXG, stats, features: features .into_iter() @@ -488,7 +486,6 @@ impl RootConnectionState { } let mut response = NvList::new_unique_names(); - response.insert("response_type", TYPE_GET_STATS).unwrap(); response.insert("token", &request.token).unwrap(); response.insert("stats", nvl.as_ref()).unwrap(); @@ -516,13 +513,8 @@ impl RootConnectionState { .await; } #[derive(Debug, Serialize)] - struct ClosePoolResponse { - response_type: &'static str, - } - let response = ClosePoolResponse { - response_type: TYPE_CLOSE_POOL, - }; - return_struct(response, true) + struct ClosePoolResponse {} + return_struct(ClosePoolResponse {}, true) })) } @@ -550,14 +542,12 @@ impl RootConnectionState { #[derive(Debug, Serialize)] struct EnableFeatureResponse { - response_type: &'static str, feature: String, } let response = EnableFeatureResponse { - response_type: TYPE_ENABLE_FEATURE, feature: request.feature, }; - handler_return_struct(response, true) + Ok(Box::pin(future::ready(return_struct(response, true)))) } fn resume_destroy_pool(&mut self, nvl: NvList) -> HandlerReturn { @@ -575,22 +565,16 @@ impl RootConnectionState { let result = pool_destroy::resume_destroy(object_access, request.guid) .await .map_err(FailureMessage::new); - return_result(TYPE_RESUME_DESTROY_POOL, (), result, true) + return_result((), result, true) })) } fn clear_hit_data(&mut self, _nvl: NvList) -> HandlerReturn { let cache = self.cache.clone(); Ok(Box::pin(async move { - #[derive(Debug, Serialize)] - struct ClearHitDataResponse { - response_type: &'static str, - result: &'static str, - } - debug!("got ClearHitDataRequest"); cache.clear_hit_data().await; - return_ok(TYPE_CLEAR_HIT_DATA, (), true) + return_ok((), true) })) } @@ -604,7 +588,7 @@ impl RootConnectionState { .add_disk(&request.path) .await .map_err(FailureMessage::new); - return_result(TYPE_ADD_DISK, (), result, true) + return_result((), result, true) })) } @@ -618,7 +602,7 @@ impl RootConnectionState { .expand_disk(&request.path) .await .map_err(FailureMessage::new); - return_result(TYPE_EXPAND_DISK, (), result, true) + return_result((), result, true) })) } @@ -628,7 +612,7 @@ impl RootConnectionState { debug!("got {:?}", nvl); cache.sync_checkpoint().await; - return_ok(TYPE_SYNC_CHECKPOINT, (), true) + return_ok((), true) })) } @@ -638,7 +622,7 @@ impl RootConnectionState { debug!("got {:?}", nvl); cache.initiate_merge().await; - return_ok(TYPE_INITIATE_MERGE, (), true) + return_ok((), true) })) } } @@ -661,10 +645,3 @@ where } Ok(Some(nvl)) } - -fn handler_return_struct(response: T, debug: bool) -> HandlerReturn -where - T: Debug + Serialize, -{ - Ok(Box::pin(future::ready(return_struct(response, debug)))) -} diff --git a/cmd/zfs_object_agent/zettaobject/src/server.rs b/cmd/zfs_object_agent/zettaobject/src/server.rs index a0d480afb5d7..ed484d7c2c28 100644 --- a/cmd/zfs_object_agent/zettaobject/src/server.rs +++ b/cmd/zfs_object_agent/zettaobject/src/server.rs @@ -317,11 +317,14 @@ where with_alloctag_hf("Server::start_connection() NvList::lookup_string()", || { nvl.lookup_string(AGENT_REQUEST_TYPE) })?; - let request_type = request_type_cstr.to_str()?; - match self.nvlist_handlers.get(request_type) { + let request_type = request_type_cstr.to_str()?.to_owned(); + match self.nvlist_handlers.get(&request_type) { Some(HandlerEnum::Serial(handler)) => { let response_opt = handler(&mut state, nvl).await?; - if let Some(response) = response_opt { + if let Some(mut response) = response_opt { + response + .insert(AGENT_RESPONSE_TYPE, request_type.as_str()) + .unwrap(); responder.respond_with_nvlist(response); } } @@ -331,7 +334,10 @@ where let responder = responder.clone(); tokio::spawn(async move { match fut.await { - Ok(Some(response)) => { + Ok(Some(mut response)) => { + response + .insert(AGENT_RESPONSE_TYPE, request_type.as_str()) + .unwrap(); responder.respond_with_nvlist(response); } Ok(None) => {} @@ -513,7 +519,7 @@ pub enum FailureMessage { impl Debug for FailureMessage { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - FailureMessage::Other { message } => write!(f, "{}", message), + FailureMessage::Other { message } => write!(f, "{message}"), } } } @@ -535,14 +541,14 @@ impl FailureMessage { /// (not tuple-like). FailureMessage is an example. /// /// The response nvlist will have the following nvpairs: -/// * "response_type" -> response_type (string) /// * fields from R /// * if result.is_ok(), fields from O /// * if result.is_err(), "err" -> EnumVariantName (string) /// * if result.is_err(), "errstr" -> stringified error (string) /// * if result.is_err(), fields from the varant of E +/// +/// Note that the server infrastructure will add the "response_type" nvpair pub fn return_result( - response_type: &str, request_id: R, result: Result, debug: bool, @@ -553,8 +559,7 @@ where E: Debug + Serialize, { #[derive(Debug, Serialize)] - struct Response<'a, R, O, E> { - response_type: &'a str, + struct Response { #[serde(flatten)] request: R, #[serde(flatten)] @@ -565,16 +570,15 @@ where } if let Err(e) = &result { - error!("sending failure: {:?}", e); + error!("sending failure: {e:?}"); } let (ok, errstr, err) = match result { Ok(o) => (Some(o), None, None), - Err(e) => (None, Some(format!("{:?}", e)), Some(e)), + Err(e) => (None, Some(format!("{e:?}")), Some(e)), }; let response = Response { - response_type, request: request_id, ok, err, @@ -582,9 +586,9 @@ where }; if debug { - trace!("sending response: {:?}", response); + trace!("sending response: {response:?}"); } else { - super_trace!("sending response: {:?}", response); + super_trace!("sending response: {response:?}"); } let nvl = nvpair::to_nvlist(&response)?; @@ -596,17 +600,17 @@ where } if debug { - maybe_die_with(|| format!("before sending response: {:?}", nvl)); - debug!("sending response nvl: {:?}", nvl); + maybe_die_with(|| format!("before sending response: {nvl:?}")); + debug!("sending response nvl: {nvl:?}"); } else { - super_trace!("sending response nvl: {:?}", nvl); + super_trace!("sending response nvl: {nvl:?}"); } Ok(Some(nvl)) } -pub fn return_ok(response_type: &str, response: O, debug: bool) -> Result> +pub fn return_ok(response: O, debug: bool) -> Result> where O: Debug + Serialize, { - return_result(response_type, (), Ok::<_, ()>(response), debug) + return_result((), Ok::<_, ()>(response), debug) } diff --git a/lib/libzutil/zutil_import.c b/lib/libzutil/zutil_import.c index ebb1ac2a55d8..4669069a8662 100644 --- a/lib/libzutil/zutil_import.c +++ b/lib/libzutil/zutil_import.c @@ -1901,8 +1901,10 @@ zpool_find_import_agent(libpc_handle_t *hdl, importargs_t *iarg, nvlist_t *resp = zoa_send_recv_msg(hdl, msg, AGENT_PUBLIC_PROTOCOL_VERSION, ZFS_PUBLIC_SOCKET, NULL); + nvlist_t *pools = fnvlist_lookup_nvlist(resp, "pools"); + nvpair_t *elem = NULL; - while ((elem = nvlist_next_nvpair(resp, elem)) != NULL) { + while ((elem = nvlist_next_nvpair(pools, elem)) != NULL) { avl_index_t where; rdsk_node_t *slice; nvlist_t *config; From 27a45caa3659ff5de4ff442be5c0853109ed4197 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Wed, 1 Jun 2022 12:14:53 -0700 Subject: [PATCH 02/10] DLPX-81374 spa_config_enter(RW_WRITER) causes FlushWriteRequest storm, tiny objects (#453) --- include/sys/spa.h | 1 - module/zfs/spa_misc.c | 17 ----------------- module/zfs/zio.c | 8 -------- 3 files changed, 26 deletions(-) diff --git a/include/sys/spa.h b/include/sys/spa.h index 74b6ecd40cbd..84cff7c22327 100644 --- a/include/sys/spa.h +++ b/include/sys/spa.h @@ -978,7 +978,6 @@ extern void spa_config_enter(spa_t *spa, int locks, const void *tag, krw_t rw); extern void spa_config_exit(spa_t *spa, int locks, const void *tag); extern int spa_config_held(spa_t *spa, int locks, krw_t rw); extern void spa_config_enter_read_priority(spa_t *, int); -extern int spa_config_write_wanted(spa_t *, int locks); /* Pool vdev add/remove lock */ extern uint64_t spa_vdev_enter(spa_t *spa); diff --git a/module/zfs/spa_misc.c b/module/zfs/spa_misc.c index 2853f87db3e4..ac4eaa2e4527 100644 --- a/module/zfs/spa_misc.c +++ b/module/zfs/spa_misc.c @@ -515,23 +515,6 @@ spa_config_tryenter(spa_t *spa, int locks, void *tag, krw_t rw) return (1); } -int -spa_config_write_wanted(spa_t *spa, int locks) -{ - int locks_wanted = 0; - for (int i = 0; i < SCL_LOCKS; i++) { - spa_config_lock_t *scl = &spa->spa_config_lock[i]; - if (!(locks & (1 << i))) - continue; - mutex_enter(&scl->scl_lock); - if (scl->scl_write_wanted) { - locks_wanted |= 1 << i; - } - mutex_exit(&scl->scl_lock); - } - return (locks_wanted); -} - /* * This function should only be called as an exception since it * will not check for any waiting writers and could lead to starvation. diff --git a/module/zfs/zio.c b/module/zfs/zio.c index 8a7199036b1e..d50590768307 100644 --- a/module/zfs/zio.c +++ b/module/zfs/zio.c @@ -3871,14 +3871,6 @@ zio_vdev_io_start(zio_t *zio) SCL_ZIO); object_store_update_max_blockid(zio); - - /* - * If there is a spa_config_lock WRITER - * waiting, then keep flushing out the max - * I/O that has been issued. - */ - if (spa_config_write_wanted(spa, SCL_ZIO)) - object_store_flush_locked_writes(spa); } else { spa_config_enter(spa, SCL_ZIO, zio, RW_READER); } From 3075380425c46f4aaa7188f28d09895abd7543fa Mon Sep 17 00:00:00 2001 From: Serapheim Dimitropoulos Date: Wed, 1 Jun 2022 14:22:58 -0700 Subject: [PATCH 03/10] Disk::new() may leak file descriptors (#459) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit = Problem If you try to add a disk that already exists in the ZettaCache we’ll end up getting to this point: ``` fn add_disk(&mut self, path: &Path) -> Result<()> { ... let disk_id = self.block_access.add_disk(Disk::new(path, false)?)?; ``` At that point, `Disk::new()` has succeeded (opening a file descriptor with a static lifetime to the dev device) but we are failing in `BlockAccess.add_disk()` (which make us leak the file descriptor). Here is a test where we can see how the file descriptor to the device file is leaked: ``` ~$ sudo lsof -p $(pgrep zfs_o) | grep "/dev/sdb" | wc -l 58 ~$ for i in {0..20}; do sudo zcache add /dev/sdb; done Error: disk "/dev/sdb" ("/dev/sdb") is already part of the zettacache .. 20 error messages ... ~$ sudo lsof -p $(pgrep zfs_o) | grep "/dev/sdb" | wc -l 79 ``` = Patch The file descriptor is kept behind an `Arc` so when the read/write threads terminate (they do so when all the Senders are dropped and their `recv()` calls return an error) and the "main" thread drops `Disk`, the file descriptor is automatically closed. = Testing I made sure that the file descriptor count stayed the same performing the above test: ``` ~$ sudo lsof -p $(pgrep zfs_o) | grep "/dev/sdb" | wc -l 1 ~$ for i in {0..20}; do sudo zcache add /dev/sdb; done Error: disk "/dev/sdb" ("/dev/sdb") is already part of the zettacache ... 20 error messages ... ~$ sudo lsof -p $(pgrep zfs_o) | grep "/dev/sdb" | wc -l 1 ``` --- .../zettacache/src/block_access.rs | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/cmd/zfs_object_agent/zettacache/src/block_access.rs b/cmd/zfs_object_agent/zettacache/src/block_access.rs index 3777bd9f1f8d..a3878e1df63c 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_access.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_access.rs @@ -9,6 +9,7 @@ use std::os::unix::prelude::OpenOptionsExt; use std::path::Path; use std::path::PathBuf; use std::sync::atomic::Ordering; +use std::sync::Arc; use std::sync::Mutex; use std::sync::RwLock; use std::thread::sleep; @@ -141,13 +142,7 @@ pub struct BlockAccess { #[derive(Derivative)] #[derivative(Debug)] pub struct Disk { - // We want all the reader/writer_threads to share the same file descriptor, but we don't have - // a mechanism to ensure that they stop using the fd when the DiskStruct is dropped and the - // fd is closed. To solve this we simply never close the fd. The fd is owned by the File, - // and we leave a reference to it here to indicate that it's related to this Disk, even - // though it's only used via the reader/writer_threads. - #[allow(dead_code)] - file: &'static File, + file: Arc, path: PathBuf, canonical_path: PathBuf, @@ -217,8 +212,8 @@ impl Disk { .open(path) .with_context(|| format!("opening disk {path:?}"))?; // see comment in `struct Disk` - let file = &*Box::leak(Box::new(file)); - let (sector_size, size) = disk_sizes(file)?; + let file = Arc::new(file); + let (sector_size, size) = disk_sizes(&file)?; let short_name = path.file_name().unwrap().to_string_lossy().to_string(); let canonical_path = Path::new(path).canonicalize()?; @@ -260,12 +255,14 @@ impl Disk { // note, we want to use a "std" thread here rather than // tokio::task::spawn_blocking() because the latter has a limit of how many // threads it will create (default 512) + let file = this.file.clone(); std::thread::spawn(move || { Self::reader_thread(file, io_stats, sector_size, rx); }); } if !readonly { for rx in writer_rxs { + let file = this.file.clone(); std::thread::spawn(move || { Self::writer_thread( file, @@ -276,6 +273,7 @@ impl Disk { }); } for rx in metadata_writer_rxs { + let file = this.file.clone(); std::thread::spawn(move || { Self::writer_thread( file, @@ -292,17 +290,20 @@ impl Disk { } fn reader_thread( - file: &'static File, + file: Arc, io_stats: &'static DiskIoStats, sector_size: usize, rx: flume::Receiver, ) { + // When all the Senders of receiver `rx` are dropped (i.e. when the + // respective Disk of this thread is dropped), `recv()` will return an + // error and this thread will terminate without being leaked. while let Ok(message) = rx.recv() { let op = OpInProgress::new(&io_stats.stats[message.io_type]); let vec = measure!() .func(|| { pread_aligned( - file, + &file, message.offset.try_into().unwrap(), message.size, sector_size, @@ -347,7 +348,7 @@ impl Disk { } fn writer_thread( - file: &'static File, + file: Arc, stat_values: &'static IoStatValues, sector_size: usize, mut rx: mpsc::UnboundedReceiver, @@ -379,6 +380,9 @@ impl Disk { let mut sorted: BTreeMap = BTreeMap::new(); let mut prev_offset = 0; + // When all the Senders of receiver `rx` are dropped (i.e. when the + // respective Disk of this thread is dropped), `blocking_recv()` will + // return an error and this thread will terminate without being leaked. loop { // Look for next run of messages in sorted queue let (run, len) = find_run(iter_wrapping(&sorted, prev_offset)); @@ -553,7 +557,7 @@ impl BlockAccess { pub fn expand_disk(&self, disk: DiskId) -> Result { let disks = self.disks.read().unwrap(); let disk = &disks[disk.index()]; - let (_, new_size) = disk_sizes(disk.file)?; + let (_, new_size) = disk_sizes(&disk.file)?; let mut size = disk.size.lock().unwrap(); let additional_bytes = new_size.checked_sub(*size).ok_or_else(|| { anyhow!( From d591138fb8bc4e3d650883e384879a0014e38087 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Wed, 1 Jun 2022 15:38:40 -0700 Subject: [PATCH 04/10] increase zfs send prefetching (#456) When `zfs send` has to retrieve the data from the object store (as opposed to zettacache), we can benefit from a larger prefetch queue depth, similar to what we see for predictive prefetching. This is because we need a high GetObject queue depth (dozens of requests, covering dozends of MB's of data) to get good throughput from S3. This commit increases the zfs send prefetching to 64MB (from 16MB), and clarifies the comment around it (there's really no downside to increasing it aside from RAM usage, since this is not using ARC prefetching any more). --- module/zfs/dmu_send.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/module/zfs/dmu_send.c b/module/zfs/dmu_send.c index af08cc02bdb0..64aecf8bf0c5 100644 --- a/module/zfs/dmu_send.c +++ b/module/zfs/dmu_send.c @@ -69,13 +69,11 @@ /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */ static int zfs_send_corrupt_data = B_FALSE; /* - * This tunable controls the amount of data (measured in bytes) that will be - * prefetched by zfs send. If the main thread is blocking on reads that haven't - * completed, this variable might need to be increased. If instead the main - * thread is issuing new reads because the prefetches have fallen out of the - * cache, this may need to be decreased. + * This tunable controls the amount of memory (measured in bytes) that will be + * used to buffer data read for zfs send. If the main thread is blocking on + * reads that haven't completed, this variable might need to be increased. */ -static int zfs_send_queue_length = SPA_MAXBLOCKSIZE; +static int zfs_send_queue_length = 64 * 1024 * 1024; /* * This tunable controls the length of the queues that zfs send worker threads * use to communicate. If the send_main_thread is blocking on these queues, From 5996df3b9ea6a0c0e72c80bc391d298874ef83e0 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Thu, 2 Jun 2022 08:54:15 -0700 Subject: [PATCH 05/10] include frame pointers (#463) Include frame pointers so that we get a little bit of stack trace for flame graphs (without full dwarf unwinding). --- cmd/zfs_object_agent/.cargo/config.toml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cmd/zfs_object_agent/.cargo/config.toml b/cmd/zfs_object_agent/.cargo/config.toml index b441aa8fa90d..d5955cf2b05a 100644 --- a/cmd/zfs_object_agent/.cargo/config.toml +++ b/cmd/zfs_object_agent/.cargo/config.toml @@ -9,3 +9,9 @@ rustflags = [ "-Dclippy::dbg_macro", ] +# include frame pointers so that we get a little bit of stack trace for +# flame graphs (without full dwarf unwinding) +[target.x86_64-unknown-linux-gnu] +rustflags = [ + "-C", "force-frame-pointers=yes", +] From 3ebf253e972568d70b1b848a69ea89b8367a9db4 Mon Sep 17 00:00:00 2001 From: Don Brady Date: Thu, 2 Jun 2022 14:08:15 -0600 Subject: [PATCH 06/10] Revert "DLPX-79316 use exportfs finer grain cache control from libshare (#365)" (#460) This reverts commit 66b2e0f1cb919d5fdfa13c46eefc8c27689cb4d3. --- lib/libshare/libshare.c | 6 -- lib/libshare/nfs.h | 1 - lib/libshare/os/linux/nfs.c | 140 +++--------------------------------- 3 files changed, 9 insertions(+), 138 deletions(-) diff --git a/lib/libshare/libshare.c b/lib/libshare/libshare.c index 7294940a27f8..7bc31d7a6823 100644 --- a/lib/libshare/libshare.c +++ b/lib/libshare/libshare.c @@ -75,12 +75,6 @@ libshare_init(void) libshare_smb_init(); } -__attribute__((destructor)) static void -libshare_fini(void) -{ - libshare_nfs_fini(); -} - int sa_enable_share(const char *zfsname, const char *mountpoint, const char *shareopts, char *protocol) diff --git a/lib/libshare/nfs.h b/lib/libshare/nfs.h index 01183bd712f5..cfac274c3d26 100644 --- a/lib/libshare/nfs.h +++ b/lib/libshare/nfs.h @@ -29,7 +29,6 @@ #define FILE_HEADER "# !!! DO NOT EDIT THIS FILE MANUALLY !!!\n\n" void libshare_nfs_init(void); -void libshare_nfs_fini(void); boolean_t nfs_is_shared_impl(const char *exports, sa_share_impl_t impl_share); int nfs_toggle_share(const char *lockfile, const char *exports, diff --git a/lib/libshare/os/linux/nfs.c b/lib/libshare/os/linux/nfs.c index 15a762fa73f8..5acfa3fb8545 100644 --- a/lib/libshare/os/linux/nfs.c +++ b/lib/libshare/os/linux/nfs.c @@ -23,7 +23,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2011 Gunnar Beutner * Copyright (c) 2012 Cyril Plisko. All rights reserved. - * Copyright (c) 2019, 2022 by Delphix. All rights reserved. + * Copyright (c) 2019, 2020 by Delphix. All rights reserved. */ #include @@ -31,13 +31,10 @@ #include #include #include -#include -#include #include #include #include #include -#include #include #include #include @@ -48,8 +45,6 @@ #define ZFS_EXPORTS_FILE ZFS_EXPORTS_DIR"/zfs.exports" #define ZFS_EXPORTS_LOCK ZFS_EXPORTS_FILE".lock" -#define EXPORTFS_PROG "/usr/sbin/exportfs" - static sa_fstype_t *nfs_fstype; typedef int (*nfs_shareopt_callback_t)(const char *opt, const char *value, @@ -58,16 +53,6 @@ typedef int (*nfs_shareopt_callback_t)(const char *opt, const char *value, typedef int (*nfs_host_callback_t)(FILE *tmpfile, const char *sharepath, const char *host, const char *security, const char *access, void *cookie); -static boolean_t exportfs_cache_control = B_FALSE; - -typedef struct unshared_dataset_entry { - list_node_t ude_node; - char ude_path[0]; -} unshared_dataset_entry_t; - -static pthread_mutex_t unshared_dataset_lock; -static list_t unshared_dataset_list; - /* * Invokes the specified callback function for each Solaris share option * listed in the specified string. @@ -154,6 +139,10 @@ foreach_nfs_host_cb(const char *opt, const char *value, void *pcookie) nfs_host_cookie_t *udata = (nfs_host_cookie_t *)pcookie; int cidr_len; +#ifdef DEBUG + fprintf(stderr, "foreach_nfs_host_cb: key=%s, value=%s\n", opt, value); +#endif + if (strcmp(opt, "sec") == 0) udata->security = value; @@ -444,37 +433,13 @@ nfs_enable_share(sa_share_impl_t impl_share) nfs_enable_share_impl)); } -/* - * Save the mountpoint so we can flush the rpc caches after - * the shares are commited. - */ -static int -save_unshared_dataset(const char *mountpoint) -{ - pthread_mutex_lock(&unshared_dataset_lock); - unshared_dataset_entry_t *ude = - malloc(sizeof (unshared_dataset_entry_t) + - strlen(mountpoint) + 1); - if (ude == NULL) - return (SA_NO_MEMORY); - strcpy(ude->ude_path, mountpoint); - list_insert_tail(&unshared_dataset_list, ude); - pthread_mutex_unlock(&unshared_dataset_lock); - - return (SA_OK); -} - /* * Disables NFS sharing for the specified share. */ static int nfs_disable_share_impl(sa_share_impl_t impl_share, FILE *tmpfile) { - (void) tmpfile; - - if (exportfs_cache_control) - return (save_unshared_dataset(impl_share->sa_mountpoint)); - + (void) impl_share, (void) tmpfile; return (SA_OK); } @@ -527,60 +492,16 @@ nfs_clear_shareopts(sa_share_impl_t impl_share) FSINFO(impl_share, nfs_fstype)->shareopts = NULL; } -static int -exportfs_flush_cache_entry(char *mountpoint) -{ - ASSERT(exportfs_cache_control); - - char *argv[] = { - EXPORTFS_PROG, - "-F", - mountpoint, - NULL - }; - - return (libzfs_run_process(argv[0], argv, 0)); -} - -static void -flush_unshared_datasets(void) -{ - /* - * After calling exportfs to reexport our shares we need to - * flush the rpc cache entries for datasets that were unshared. - */ - unshared_dataset_entry_t *ude = NULL; - pthread_mutex_lock(&unshared_dataset_lock); - while ((ude = list_remove_head(&unshared_dataset_list)) != NULL) { - pthread_mutex_unlock(&unshared_dataset_lock); - exportfs_flush_cache_entry(ude->ude_path); - pthread_mutex_lock(&unshared_dataset_lock); - free(ude); - } - pthread_mutex_unlock(&unshared_dataset_lock); -} - -/* - * Reexport all directories including the ones maintained in our - * '/etc/exports.d/zfs.exports' file. This will remove entries - * that have been deleted from our exports file. - */ static int nfs_commit_shares(void) { - char *exportfs_args = exportfs_cache_control ? "-raN" : "-ra"; char *argv[] = { - EXPORTFS_PROG, - exportfs_args, + "/usr/sbin/exportfs", + "-ra", NULL }; - int rc = libzfs_run_process(argv[0], argv, 0); - - if (exportfs_cache_control) - flush_unshared_datasets(); - - return (rc); + return (libzfs_run_process(argv[0], argv, 0)); } static const sa_share_ops_t nfs_shareops = { @@ -594,31 +515,6 @@ static const sa_share_ops_t nfs_shareops = { .commit_shares = nfs_commit_shares, }; -static boolean_t -cache_control_supported(void) -{ - char *argv[] = {EXPORTFS_PROG, "-vh", NULL}; - char **lines = NULL; - int lines_cnt = 0; - int rc; - boolean_t supported; - - /* - * Expected output to confirm support for finer-grain cache flushing: - * - * $ exportfs -vh - * supports: no_cache_flush,flush_one_entry - * usage: exportfs [-adfFhiNoruvs] [host:/path] - */ - rc = libzfs_run_process_get_stdout_nopath(EXPORTFS_PROG, argv, NULL, - &lines, &lines_cnt); - supported = (rc == 0 && lines_cnt > 0 && - strstr(lines[0], "no_cache_flush") != NULL); - libzfs_free_str_array(lines, lines_cnt); - - return (supported); -} - /* * Initializes the NFS functionality of libshare. */ @@ -626,22 +522,4 @@ void libshare_nfs_init(void) { nfs_fstype = register_fstype("nfs", &nfs_shareops); - - exportfs_cache_control = cache_control_supported(); - if (exportfs_cache_control) { - pthread_mutex_init(&unshared_dataset_lock, NULL); - list_create(&unshared_dataset_list, - sizeof (unshared_dataset_entry_t), - offsetof(unshared_dataset_entry_t, ude_node)); - } -} - -void -libshare_nfs_fini(void) -{ - if (exportfs_cache_control) { - ASSERT(list_is_empty(&unshared_dataset_list)); - list_destroy(&unshared_dataset_list); - pthread_mutex_destroy(&unshared_dataset_lock); - } } From 669c92a934a00005bc1ad19350992b7b598cd535 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Thu, 2 Jun 2022 21:05:27 -0700 Subject: [PATCH 07/10] zettacache read aggregation (#452) This commit adds the ability to the zettacache to aggregate adjacent reads from disk. --- cmd/zfs_object_agent/Cargo.lock | 54 +------ .../util/src/zettacache_stats.rs | 2 +- cmd/zfs_object_agent/zettacache/Cargo.toml | 1 - .../zettacache/src/block_access.rs | 138 +++++++++++++----- 4 files changed, 105 insertions(+), 90 deletions(-) diff --git a/cmd/zfs_object_agent/Cargo.lock b/cmd/zfs_object_agent/Cargo.lock index 6e8fa5e001c2..db729aafc71a 100644 --- a/cmd/zfs_object_agent/Cargo.lock +++ b/cmd/zfs_object_agent/Cargo.lock @@ -808,19 +808,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "flume" -version = "0.10.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a" -dependencies = [ - "futures-core", - "futures-sink", - "nanorand", - "pin-project", - "spin 0.9.3", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1530,15 +1517,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7843ec2de400bcbc6a6328c958dc38e5359da6e93e72e37bc5246bf1ae776389" -[[package]] -name = "nanorand" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom", -] - [[package]] name = "native-tls" version = "0.2.10" @@ -1865,26 +1843,6 @@ dependencies = [ "sha-1", ] -[[package]] -name = "pin-project" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.9" @@ -2106,7 +2064,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin 0.5.2", + "spin", "untrusted", "web-sys", "winapi 0.3.9", @@ -2541,15 +2499,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "spin" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" -dependencies = [ - "lock_api", -] - [[package]] name = "str-buf" version = "1.0.5" @@ -3275,7 +3224,6 @@ dependencies = [ "conv", "derivative", "either", - "flume", "futures", "futures-core", "lazy_static", diff --git a/cmd/zfs_object_agent/util/src/zettacache_stats.rs b/cmd/zfs_object_agent/util/src/zettacache_stats.rs index 666035fbf5c9..43cdcfc25abf 100644 --- a/cmd/zfs_object_agent/util/src/zettacache_stats.rs +++ b/cmd/zfs_object_agent/util/src/zettacache_stats.rs @@ -38,7 +38,7 @@ use crate::nice_p2size; use crate::write_stdout; /// The zettacache disk I/O types that are collected and displayed for each disk. -#[derive(Debug, Enum, Copy, Clone, Serialize, Deserialize)] +#[derive(Debug, Enum, Copy, Clone, Serialize, Deserialize, PartialEq)] pub enum DiskIoType { ReadDataForLookup, ReadIndexForLookup, diff --git a/cmd/zfs_object_agent/zettacache/Cargo.toml b/cmd/zfs_object_agent/zettacache/Cargo.toml index bbd2f0577547..dcff5c3b492c 100644 --- a/cmd/zfs_object_agent/zettacache/Cargo.toml +++ b/cmd/zfs_object_agent/zettacache/Cargo.toml @@ -18,7 +18,6 @@ chrono = "0.4" conv = "0.3.3" derivative = "2.2.0" either = "1.6.1" -flume = "0.10.10" futures = "0.3.13" futures-core = "0.3.13" lazy_static = "1.4.0" diff --git a/cmd/zfs_object_agent/zettacache/src/block_access.rs b/cmd/zfs_object_agent/zettacache/src/block_access.rs index a3878e1df63c..99fdcd2db459 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_access.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_access.rs @@ -55,12 +55,14 @@ use crate::base_types::Extent; tunable! { static ref MIN_SECTOR_SIZE: usize = 512; - static ref DISK_WRITE_MAX_QUEUE_DEPTH: usize = 32; - // Stop aggregating if run would exceed DISK_WRITE_MAX_AGGREGATION_SIZE + // Stop aggregating if run would exceed DISK_READ/WRITE_MAX_AGGREGATION_SIZE + pub static ref DISK_READ_MAX_AGGREGATION_SIZE: ByteSize = ByteSize::kib(128); pub static ref DISK_WRITE_MAX_AGGREGATION_SIZE: ByteSize = ByteSize::kib(128); // CHUNK must be > MAX_AGG_SIZE, see Disk::write() - static ref DISK_WRITE_CHUNK: ByteSize = ByteSize::mib(1); + static ref DISK_AGG_CHUNK: ByteSize = ByteSize::mib(1); static ref DISK_WRITE_QUEUE_EMPTY_DELAY: Duration = Duration::from_millis(1); + static ref DISK_READ_QUEUE_EMPTY_DELAY: Duration = Duration::from_micros(10); + static ref DISK_WRITE_MAX_QUEUE_DEPTH: usize = 32; static ref DISK_METADATA_WRITE_MAX_QUEUE_DEPTH: usize = 16; pub static ref DISK_READ_MAX_QUEUE_DEPTH: usize = 64; } @@ -151,7 +153,7 @@ pub struct Disk { #[derivative(Debug = "ignore")] io_stats: &'static DiskIoStats, #[derivative(Debug = "ignore")] - reader_tx: flume::Sender, + reader_txs: Vec>, #[derivative(Debug = "ignore")] writer_txs: Vec>, #[derivative(Debug = "ignore")] @@ -218,8 +220,6 @@ impl Disk { let short_name = path.file_name().unwrap().to_string_lossy().to_string(); let canonical_path = Path::new(path).canonicalize()?; - let (reader_tx, reader_rx) = flume::unbounded(); - let mut writer_txs = Vec::new(); let mut writer_rxs = Vec::new(); for _ in 0..*DISK_WRITE_MAX_QUEUE_DEPTH { @@ -236,6 +236,14 @@ impl Disk { metadata_writer_rxs.push(rx); } + let mut reader_txs = Vec::new(); + let mut reader_rxs = Vec::new(); + for _ in 0..*DISK_READ_MAX_QUEUE_DEPTH { + let (tx, rx) = mpsc::unbounded_channel(); + reader_txs.push(tx); + reader_rxs.push(rx); + } + let io_stats = &*Box::leak(Box::new(DiskIoStats::new(short_name))); let this = Disk { @@ -245,16 +253,15 @@ impl Disk { size: Mutex::new(size), sector_size, io_stats, - reader_tx, + reader_txs, writer_txs, metadata_writer_txs, }; - for _ in 0..*DISK_READ_MAX_QUEUE_DEPTH { - let rx = reader_rx.clone(); - // note, we want to use a "std" thread here rather than - // tokio::task::spawn_blocking() because the latter has a limit of how many - // threads it will create (default 512) + // note, we use "std" threads rather than tokio::task::spawn_blocking() because the + // latter has a limit of how many threads it will create (default 512) + + for rx in reader_rxs { let file = this.file.clone(); std::thread::spawn(move || { Self::reader_thread(file, io_stats, sector_size, rx); @@ -293,32 +300,91 @@ impl Disk { file: Arc, io_stats: &'static DiskIoStats, sector_size: usize, - rx: flume::Receiver, + mut rx: mpsc::UnboundedReceiver, ) { - // When all the Senders of receiver `rx` are dropped (i.e. when the - // respective Disk of this thread is dropped), `recv()` will return an - // error and this thread will terminate without being leaked. - while let Ok(message) = rx.recv() { - let op = OpInProgress::new(&io_stats.stats[message.io_type]); - let vec = measure!() - .func(|| { - pread_aligned( - &file, - message.offset.try_into().unwrap(), - message.size, - sector_size, - ) - }) + /// returns (offsets, total_bytes) + fn find_run<'a, I: Iterator>( + mut iter: I, + ) -> Option<(Vec, usize, DiskIoType)> { + let (mut run, mut len, io_type) = if let Some((&offset, message)) = iter.next() { + (vec![offset], message.size, message.io_type) + } else { + return None; + }; + for (&offset, message) in iter { + if len > 0 && len + message.size > DISK_READ_MAX_AGGREGATION_SIZE.as_usize() { + break; + } + if message.io_type != io_type { + break; + } + if offset == run[0] + len as u64 { + run.push(offset); + len += message.size; + } else { + break; + } + } + Some((run, len, io_type)) + } + + let read_impl = |offset: u64, size, io_type| { + let op = OpInProgress::new(&io_stats.stats[io_type]); + let vec = measure!("pread_aligned") + .func(|| pread_aligned(&file, offset.try_into().unwrap(), size, sector_size)) .unwrap(); assert_eq!( vec.len(), - message.size, + size, "fd={}, offset={}", file.as_raw_fd(), - message.offset + offset ); - op.end(message.size as u64); - message.tx.send(vec.into()).unwrap(); + op.end(size as u64); + AlignedBytes::from(vec) + }; + + let mut sorted: BTreeMap = BTreeMap::new(); + let mut prev_offset = 0; + + // When all the Senders of receiver `rx` are dropped (i.e. when the + // respective Disk of this thread is dropped), `blocking_recv()` will + // return an error and this thread will terminate without being leaked. + loop { + // Look for next run of messages in sorted queue + if let Some((run, len, io_type)) = find_run(iter_wrapping(&sorted, prev_offset)) { + // Issue one read for this run (which may have multiple messages) + let first_offset = run[0]; + let bytes = read_impl(first_offset, len, io_type); + for offset in run { + let message = sorted.remove(&offset).unwrap(); + assert_eq!(message.io_type, io_type); + let relative_offset = (message.offset - first_offset).as_usize(); + let slice = &bytes[relative_offset..relative_offset + message.size]; + message.tx.send(bytes.slice_ref(slice)).unwrap(); + } + prev_offset = first_offset; + } else { + // Nothing in `sorted`; wait for a message + let message = match rx.blocking_recv() { + Some(message) => message, + None => return, + }; + sorted.insert(message.offset, message); + // Delay a bit to allow for more messages to arrive, to improve our chances of + // aggregation. + sleep(*DISK_READ_QUEUE_EMPTY_DELAY); + } + + // Receive as many messages as we can without blocking + while let Ok(message) = rx.try_recv() { + if let Some(message) = sorted.insert(message.offset, message) { + // duplicate read inserted; issue the old one immediately + measure!("duplicate read").hit(); + let bytes = read_impl(message.offset, message.size, message.io_type); + message.tx.send(bytes).unwrap(); + } + } } } @@ -342,8 +408,10 @@ impl Disk { tx, }; - // note: reader_tx is unbounded, so .send() will not block - self.reader_tx.send(message).unwrap(); + let reader = usize::from64(offset / DISK_AGG_CHUNK.as_u64() % self.reader_txs.len() as u64); + self.reader_txs[reader] + .send(message) + .unwrap_or_else(|e| panic!("reader_txs[{}].send: {}", reader, e)); async move { measure!().fut(rx).await.unwrap() } } @@ -489,14 +557,14 @@ impl Disk { _ => panic!("invalid {:?} for write", io_type), }; // Dispatch this write to a writer thread, determined based on its offset. The first - // DISK_WRITE_CHUNK (default 1MB) of the disk goes to the first thread, the second chunk + // DISK_AGG_CHUNK (default 1MB) of the disk goes to the first thread, the second chunk // to the second thread, and so on, wrapping back around to the first thread. Note that // each block allocator slab (32MB) is mapped to multiple threads, so the work is // distributed to multiple threads even when it's concentrated among a small number of // slabs. The CHUNK (1MB) is larger than the DISK_WRITE_MAX_AGGREGATION_SIZE (128KB) so // that we can find aggregations that cross MAX_AGG_SIZE boundaries (e.g. from offsets // 100KB to 228KB). - let writer = usize::from64(offset / DISK_WRITE_CHUNK.as_u64() % txs.len() as u64); + let writer = usize::from64(offset / DISK_AGG_CHUNK.as_u64() % txs.len() as u64); txs[writer] .send(message) .unwrap_or_else(|e| panic!("writer_txs[{}].send: {}", writer, e)); From edad42be9d9d52f3fd4d7faba1d59ea6053eb459 Mon Sep 17 00:00:00 2001 From: Manoj Joseph Date: Thu, 2 Jun 2022 22:35:18 -0700 Subject: [PATCH 08/10] DLPX-80615 DOSE-Azure: Add support for an optional storage emulator endpoint (#454) --- cmd/zfs_object_agent/Cargo.lock | 155 +++++++++++-- cmd/zfs_object_agent/client/src/main.rs | 21 +- cmd/zfs_object_agent/server/src/main.rs | 3 +- cmd/zfs_object_agent/zettaobject/Cargo.toml | 9 +- .../zettaobject/src/object_access/blob.rs | 205 +++++++++++++----- .../zettaobject/src/object_access/mod.rs | 32 ++- .../zettaobject/src/object_access/s3.rs | 1 + cmd/zpool/zpool_vdev.c | 5 - 8 files changed, 343 insertions(+), 88 deletions(-) diff --git a/cmd/zfs_object_agent/Cargo.lock b/cmd/zfs_object_agent/Cargo.lock index db729aafc71a..7e3be71ceb20 100644 --- a/cmd/zfs_object_agent/Cargo.lock +++ b/cmd/zfs_object_agent/Cargo.lock @@ -89,6 +89,15 @@ dependencies = [ "synattra", ] +[[package]] +name = "async-lock" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -226,6 +235,34 @@ dependencies = [ "uuid 0.8.2", ] +[[package]] +name = "azure_core" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca4393afee90ad13c987a2cbfeb5bbb0b9fb3c86585e42ed3ed151babaa93da1" +dependencies = [ + "async-trait", + "base64", + "bytes", + "chrono", + "dyn-clone", + "futures", + "getrandom", + "http", + "log", + "oauth2", + "pin-project", + "rand", + "reqwest", + "rustc_version", + "serde", + "serde_derive", + "serde_json", + "thiserror", + "url", + "uuid 1.0.0", +] + [[package]] name = "azure_identity" version = "0.1.1" @@ -234,7 +271,29 @@ checksum = "ebda98657980528a8f0f0f7cc85c88c7dabc160e026bf258d06e54b77b698b08" dependencies = [ "async-timer", "async-trait", - "azure_core", + "azure_core 0.1.1", + "chrono", + "futures", + "log", + "oauth2", + "reqwest", + "serde", + "serde_json", + "thiserror", + "url", +] + +[[package]] +name = "azure_identity" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dc06f2cd86f196a9d33258bc486d611720420e39cf3ff15d0fb6a81f75bd72" +dependencies = [ + "async-lock", + "async-timer", + "async-trait", + "azure_core 0.2.2", + "base64", "chrono", "futures", "log", @@ -244,6 +303,7 @@ dependencies = [ "serde_json", "thiserror", "url", + "uuid 1.0.0", ] [[package]] @@ -254,7 +314,7 @@ checksum = "22c413e8459badf86c9e6e0c84f5894609663bcc8fa5eb1e49bfb985273dac58" dependencies = [ "RustyXML", "async-trait", - "azure_core", + "azure_core 0.1.1", "base64", "bytes", "chrono", @@ -272,6 +332,33 @@ dependencies = [ "uuid 0.8.2", ] +[[package]] +name = "azure_storage" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9f2aee687da9817f7b332e1e01dda51cd9f7a0a68a5abcfec7c4c494a65546" +dependencies = [ + "RustyXML", + "async-trait", + "azure_core 0.2.2", + "base64", + "bytes", + "chrono", + "futures", + "hmac 0.12.1", + "http", + "log", + "once_cell", + "serde", + "serde-xml-rs", + "serde_derive", + "serde_json", + "sha2 0.10.2", + "thiserror", + "url", + "uuid 1.0.0", +] + [[package]] name = "azure_storage_blobs" version = "0.1.0" @@ -279,8 +366,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a70ec6fab8a2cae5d774098267870c0f3fbef1cb63cac12afab38b8c17cc8d97" dependencies = [ "RustyXML", - "azure_core", - "azure_storage", + "azure_core 0.1.1", + "azure_storage 0.1.0", "base64", "bytes", "chrono", @@ -297,6 +384,31 @@ dependencies = [ "uuid 0.8.2", ] +[[package]] +name = "azure_storage_blobs" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d17982127c4a34736a60656ddbd05b1714420686b6e6304145ee3b4501395e75" +dependencies = [ + "RustyXML", + "azure_core 0.2.2", + "azure_storage 0.2.0", + "base64", + "bytes", + "chrono", + "futures", + "http", + "log", + "md5", + "serde", + "serde-xml-rs", + "serde_derive", + "serde_json", + "thiserror", + "url", + "uuid 1.0.0", +] + [[package]] name = "backtrace" version = "0.3.65" @@ -694,6 +806,7 @@ checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" dependencies = [ "block-buffer 0.10.2", "crypto-common", + "subtle", ] [[package]] @@ -775,6 +888,12 @@ dependencies = [ "str-buf", ] +[[package]] +name = "event-listener" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" + [[package]] name = "exitcode" version = "1.1.2" @@ -1116,6 +1235,15 @@ dependencies = [ "digest 0.9.0", ] +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest 0.10.3", +] + [[package]] name = "hostname" version = "0.3.1" @@ -2149,7 +2277,7 @@ dependencies = [ "digest 0.9.0", "futures", "hex", - "hmac", + "hmac 0.11.0", "http", "hyper", "log", @@ -3254,10 +3382,10 @@ dependencies = [ "arr_macro", "async-stream", "async-trait", - "azure_core", - "azure_identity", - "azure_storage", - "azure_storage_blobs", + "azure_core 0.2.2", + "azure_identity 0.3.0", + "azure_storage 0.2.0", + "azure_storage_blobs 0.2.0", "base64", "bincode", "bytes", @@ -3295,6 +3423,7 @@ dependencies = [ "stream-reduce", "tokio", "tokio-stream", + "url", "util", "uuid 1.0.0", "zettacache", @@ -3357,10 +3486,10 @@ name = "zoa_test" version = "0.1.0" dependencies = [ "anyhow", - "azure_core", - "azure_identity", - "azure_storage", - "azure_storage_blobs", + "azure_core 0.1.1", + "azure_identity 0.1.1", + "azure_storage 0.1.0", + "azure_storage_blobs 0.1.0", "chrono", "clap", "futures", diff --git a/cmd/zfs_object_agent/client/src/main.rs b/cmd/zfs_object_agent/client/src/main.rs index 8418b2771ac3..8ed3b0444957 100644 --- a/cmd/zfs_object_agent/client/src/main.rs +++ b/cmd/zfs_object_agent/client/src/main.rs @@ -55,6 +55,13 @@ const BUCKET_NAME: &str = "cloudburst-data-2"; const POOL_NAME: &str = "testpool"; const POOL_GUID: u64 = 1234; +// The default URL including port number for Azurite. +const AZURITE_EMULATOR_URL: &str = "http://127.0.0.1:10000"; +// The well-known account and key used by Azurite. +const AZURITE_EMULATOR_ACCOUNT: &str = "devstoreaccount1"; +const AZURITE_EMULATOR_ACCOUNT_KEY: &str = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + static GIT_VERSION: &str = git_version!( fallback = match option_env!("CARGO_ZOA_GITREV") { Some(value) => value, @@ -544,6 +551,8 @@ struct Cli { azure_key: Option, #[clap(short = 'm', long)] managed_identity: bool, + #[clap(long, conflicts_with_all = &["managed-identity", "azure-account", "azure-key"])] + emulator: bool, #[clap(short, long, parse(from_occurrences))] verbose: usize, @@ -666,7 +675,17 @@ impl From for CliParams { } } Protocol::Blob => ObjectAccessProtocol::Blob { - credentials: if cli.managed_identity { + endpoint: if cli.emulator { + Some(AZURITE_EMULATOR_URL.to_string()) + } else { + None + }, + credentials: if cli.emulator { + BlobCredentials::Key { + azure_account: AZURITE_EMULATOR_ACCOUNT.to_string(), + azure_key: AZURITE_EMULATOR_ACCOUNT_KEY.to_string(), + } + } else if cli.managed_identity { BlobCredentials::ManagedCredentials { azure_account: cli.azure_account.unwrap(), } diff --git a/cmd/zfs_object_agent/server/src/main.rs b/cmd/zfs_object_agent/server/src/main.rs index 0bf0ef557ecb..86ebc7dd22f6 100644 --- a/cmd/zfs_object_agent/server/src/main.rs +++ b/cmd/zfs_object_agent/server/src/main.rs @@ -231,13 +231,14 @@ fn main() { }); } Some(Commands::TestConnectivityBlob { - endpoint: _, // XXX DLPX-80615 + endpoint, bucket, azure_account, azure_key, managed_identity, }) => { let protocol = ObjectAccessProtocol::Blob { + endpoint, credentials: if managed_identity { BlobCredentials::ManagedCredentials { azure_account } } else { diff --git a/cmd/zfs_object_agent/zettaobject/Cargo.toml b/cmd/zfs_object_agent/zettaobject/Cargo.toml index 0029de5db102..61c0c253f661 100644 --- a/cmd/zfs_object_agent/zettaobject/Cargo.toml +++ b/cmd/zfs_object_agent/zettaobject/Cargo.toml @@ -10,10 +10,10 @@ publish = false [dependencies] anyhow = "1.0" arr_macro = "0.1.3" -azure_core = { version = "0.1.1"} # , default-features = false, features = ["enable_hyper"] -azure_identity = "0.1.1" -azure_storage = "0.1.0" -azure_storage_blobs = "0.1.0" +azure_core = { version = "0.2.2"} # , default-features = false, features = ["enable_hyper", "azurite_workaround"] +azure_identity = "0.3.0" +azure_storage = "0.2.0" +azure_storage_blobs = "0.2.0" async-stream = "0.3.0" async-trait = "0.1.52" base64 = "0.13.0" @@ -53,6 +53,7 @@ serde-xml-rs = "0.5.1" stream-reduce = "0.1.0" tokio = { version = "1.4", features = ["full"] } tokio-stream = "0.1.5" +url = "2.2" util = { path = "../util" } uuid = {version = "1.0.0", features = ["v4", "serde"]} zettacache = { path = "../zettacache" } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs index 2d240c67599a..04f12e11bfec 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs @@ -15,8 +15,8 @@ use anyhow::Result; use async_stream::try_stream; use async_trait::async_trait; use azure_core::HttpError; -use azure_identity::token_credentials::ImdsManagedIdentityCredential; -use azure_identity::token_credentials::TokenCredential; +use azure_identity::ImdsManagedIdentityCredential; +use azure_identity::TokenCredential; use azure_storage::clients::AsStorageClient; use azure_storage::clients::StorageAccountClient; use azure_storage::clients::StorageClient; @@ -40,6 +40,7 @@ use more_asserts::assert_le; use rusoto_core::ByteStream; use tokio::io::AsyncReadExt; use tokio::sync::RwLock; +use url::Url; use util::tunable; use super::retry; @@ -151,16 +152,12 @@ where HttpError::ExecuteRequest(err) => Self::InternalError(err.to_string()), HttpError::ReadBytes(err) => Self::InternalError(err.to_string()), HttpError::BuildResponse(err) => Self::InternalError(err.to_string()), - /* - * XXX Long term, we probably want to handle this, but for now this error - * probably only happens if we mess up our code for building requests. Panic - * to make it easier to debug and develop. - */ - HttpError::StreamReset(err) => panic!("{}", err), + HttpError::Url(err) => Self::InternalError(err.to_string()), _ => todo!(), } } } + impl From for OAError where E: MaybeFrom + Display, @@ -176,8 +173,8 @@ struct BlobBucketClient { } impl BlobBucketClient { - async fn new(credentials: BlobCredentials) -> Result { - let (storage_client, expires_on) = get_azure_storage_client(credentials).await?; + async fn new(endpoint: Option, credentials: BlobCredentials) -> Result { + let (storage_client, expires_on) = get_azure_storage_client(endpoint, credentials).await?; let blob_service = storage_client.as_blob_service_client(); Ok(Self { blob_service, @@ -198,14 +195,17 @@ impl BlobBucketClient { pub struct BlobBucketAccess { blob_bucket_client: RwLock, + endpoint: Option, credentials: BlobCredentials, } impl BlobBucketAccess { - pub async fn new(credentials: BlobCredentials) -> Result { - let blob_bucket_client = BlobBucketClient::new(credentials.clone()).await?; + pub async fn new(endpoint: Option, credentials: BlobCredentials) -> Result { + let blob_bucket_client = + BlobBucketClient::new(endpoint.clone(), credentials.clone()).await?; Ok(Self { blob_bucket_client: RwLock::new(blob_bucket_client), + endpoint, credentials, }) } @@ -214,7 +214,7 @@ impl BlobBucketAccess { let mut blob_bucket_client = self.blob_bucket_client.write().await; // Expiry might have been checked earlier but we check again after taking the write lock. if blob_bucket_client.is_expired() { - match BlobBucketClient::new(self.credentials.clone()).await { + match BlobBucketClient::new(self.endpoint.clone(), self.credentials.clone()).await { Ok(new_blob_bucket_client) => { info!("BlobServiceClient refreshed after the credential tokens expired"); *blob_bucket_client = new_blob_bucket_client; @@ -290,9 +290,13 @@ struct BlobContainerClient { } impl BlobContainerClient { - async fn new(bucket: &str, credentials: BlobCredentials) -> Result { + async fn new( + endpoint: Option, + bucket: &str, + credentials: BlobCredentials, + ) -> Result { let (storage_account_client, expires_on) = - get_azure_storage_client(credentials.clone()).await?; + get_azure_storage_client(endpoint, credentials.clone()).await?; let container_client = storage_account_client.as_container_client(bucket); Ok(Self { @@ -314,6 +318,7 @@ impl BlobContainerClient { pub struct BlobObjectAccess { blob_container_client: RwLock, + endpoint: Option, bucket: String, credentials: BlobCredentials, access_stats: ObjectAccessStats, @@ -325,7 +330,13 @@ impl BlobObjectAccess { let mut blob_container_client = self.blob_container_client.write().await; // Expiry might have been checked earlier but we check again after taking the write lock. if blob_container_client.is_expired() { - match BlobContainerClient::new(&self.bucket, self.credentials.clone()).await { + match BlobContainerClient::new( + self.endpoint.clone(), + &self.bucket, + self.credentials.clone(), + ) + .await + { Ok(new_container_client) => { info!("ContainerClient refreshed after the credential tokens expired"); *blob_container_client = new_container_client; @@ -358,13 +369,19 @@ impl BlobObjectAccess { self.update_container_client().await } - pub async fn new(bucket: &str, credentials: BlobCredentials) -> Result { - let blob_client = BlobContainerClient::new(bucket, credentials.clone()).await?; + pub async fn new( + endpoint: Option, + bucket: &str, + credentials: BlobCredentials, + ) -> Result { + let blob_client = + BlobContainerClient::new(endpoint.clone(), bucket, credentials.clone()).await?; Ok(Self { blob_container_client: RwLock::new(blob_client), access_stats: Default::default(), outstanding_ops: Default::default(), + endpoint, bucket: bucket.to_string(), credentials, }) @@ -374,6 +391,10 @@ impl BlobObjectAccess { self.bucket.clone() } + pub fn endpoint(&self) -> Option { + self.endpoint.clone() + } + pub fn credentials_profile(&self) -> Option { if let BlobCredentials::Profile(profile) = &self.credentials { Some(profile.clone()) @@ -386,8 +407,41 @@ impl BlobObjectAccess { where T: MaybeFrom + Display, { - let http_error: Box = e.downcast().unwrap(); - OAError::from(*http_error) + match e.downcast::() { + Ok(http_error) => OAError::from(*http_error), + Err(err) => { + match err.downcast::() { + Ok(az_err) => { + match *az_err { + azure_storage::Error::CoreError(azure_core::Error::Other( + other_error, + )) => { + // Azurite does not support soft-delete. But it also does not tell + // us that the delete is permanent by including + // "x-ms-delete-type-permanent" in the header. The azure-sdk + // looks for this header and not finding it, throws a CoreError. + // See: https://github.com/Azure/azure-sdk-for-rust/issues/780 + // This is propagated as RequestError::EmulatorBug. + if other_error + .to_string() + .contains("x-ms-delete-type-permanent") + { + OAError::RequestError(RequestError::EmulatorBug(format!( + "EmulatorBug: {other_error}" + ))) + } else { + OAError::Other(anyhow!( + "azure_storage::Error::CoreError: {other_error}" + )) + } + } + _ => OAError::Other(anyhow!("azure_storage::Error: {az_err}")), + } + } + Err(e) => OAError::Other(anyhow!(e)), + } + } + } } } @@ -496,14 +550,17 @@ impl ObjectAccessTrait for BlobObjectAccess { match blob_client.delete().execute().await { Err(e) => { debug!("error while deleting: {}", e); - let err = Self::convert_error::(e); - if let OAError::RequestError(RequestError::Service( - ObjectStoreError::NoSuchKey, - )) = err - { - Ok(None) - } else { - Err(err) + match Self::convert_error::(e) { + OAError::RequestError(RequestError::Service( + ObjectStoreError::NoSuchKey, + )) => Ok(None), + OAError::RequestError(RequestError::EmulatorBug(s)) => { + trace!( + "Hit emulator error which is expected; ignoring {s}" + ); + Ok(None) + } + err => Err(err), } } Ok(res) => { @@ -639,7 +696,7 @@ async fn get_azure_storage_client_with_managed_key( // There is a new AutoRefreshingTokenCredential wrapper in the repo that has not been released // yet. Once it is released, we should consider using it. // See: https://github.com/Azure/azure-sdk-for-rust/pull/673 - let creds = ImdsManagedIdentityCredential {}; + let creds = ImdsManagedIdentityCredential::default(); let bearer_token = creds.get_token("https://storage.azure.com/").await?; let expires_on = bearer_token.expires_on; @@ -653,39 +710,74 @@ async fn get_azure_storage_client_with_managed_key( Ok((client, Some(expires_on))) } -fn get_azure_storage_client_from_key( +fn get_azure_storage_emulator_client( + blob_storage: &str, azure_account: &str, azure_key: &str, ) -> Result<(Arc, Option>)> { let http_client = azure_core::new_http_client(); + let blob_storage_url = Url::parse(blob_storage).unwrap(); + + // We care only about the blob service but the API expects URLs to the other ones as well. + // So, we just pass the default values. + let queue_storage_url = Url::parse("http://127.0.0.1:10001").unwrap(); + let table_storage_url = Url::parse("http://127.0.0.1:10002").unwrap(); + let filesystem_url = Url::parse("http://127.0.0.1:10004").unwrap(); + validate_azure_key(azure_key)?; Ok(( - StorageAccountClient::new_access_key(http_client, azure_account, azure_key) - .as_storage_client(), + StorageAccountClient::new_emulator_with_account( + http_client, + &blob_storage_url, + &table_storage_url, + &queue_storage_url, + &filesystem_url, + azure_account, + azure_key, + ) + .as_storage_client(), None, )) } -fn get_azure_storage_client_from_env() -> Result<(Arc, Option>)> { +fn get_azure_storage_client_from_key( + endpoint: Option, + azure_account: &str, + azure_key: &str, +) -> Result<(Arc, Option>)> { let http_client = azure_core::new_http_client(); - let storage_client = match env::var("AZURE_CONNECTION_STRING") { - Ok(connection_string) => { + validate_azure_key(azure_key)?; + + match endpoint { + Some(endpoint) => get_azure_storage_emulator_client(&endpoint, azure_account, azure_key), + None => Ok(( + StorageAccountClient::new_access_key(http_client, azure_account, azure_key) + .as_storage_client(), + None, + )), + } +} + +fn get_azure_storage_client_from_env( + endpoint: Option, +) -> Result<(Arc, Option>)> { + let http_client = azure_core::new_http_client(); + match env::var("AZURE_CONNECTION_STRING") { + Ok(connection_string) => Ok(( StorageAccountClient::new_connection_string(http_client.clone(), &connection_string)? - .as_storage_client() - } + .as_storage_client(), + None, + )), Err(_) => { let azure_account = env::var("AZURE_ACCOUNT")?; let azure_key = env::var("AZURE_KEY")?; - validate_azure_key(&azure_key)?; - StorageAccountClient::new_access_key(http_client, azure_account, azure_key) - .as_storage_client() + get_azure_storage_client_from_key(endpoint, &azure_account, &azure_key) } - }; - - Ok((storage_client, None)) + } } + fn get_credentials_file() -> Result { let home_dir = dirs_next::home_dir(); if home_dir.is_none() { @@ -720,6 +812,7 @@ fn get_credentials_file() -> Result { } fn get_azure_storage_client_from_profile_key( + endpoint: Option, credentials_profile: String, ) -> Result<(Arc, Option>)> { let ini_file = get_credentials_file()?; @@ -744,14 +837,7 @@ fn get_azure_storage_client_from_profile_key( Some(azure_key) => azure_key, }; - validate_azure_key(azure_key)?; - - let http_client = azure_core::new_http_client(); - Ok(( - StorageAccountClient::new_access_key(http_client, azure_account, azure_key) - .as_storage_client(), - None, - )) + get_azure_storage_client_from_key(endpoint, azure_account, azure_key) } /// Create a StorageClient after getting credentials the following sources in order: @@ -760,10 +846,11 @@ fn get_azure_storage_client_from_profile_key( /// 3. managed identities. /// Once credentials have been successfully obtained from a source, we do not try the rest of the /// sources even if the credentials are invalid. -async fn get_azure_storage_client_automatic() -> Result<(Arc, Option>)> -{ - match get_azure_storage_client_from_env() - .or_else(|_| get_azure_storage_client_from_profile_key("default".to_string())) +async fn get_azure_storage_client_automatic( + endpoint: Option, +) -> Result<(Arc, Option>)> { + match get_azure_storage_client_from_env(endpoint.clone()) + .or_else(|_| get_azure_storage_client_from_profile_key(endpoint, "default".to_string())) { Ok(tuple) => Ok(tuple), Err(_) => get_azure_storage_client_with_managed_key_profile("default".to_string()).await, @@ -771,6 +858,7 @@ async fn get_azure_storage_client_automatic() -> Result<(Arc, Opt } async fn get_azure_storage_client( + endpoint: Option, credentials: BlobCredentials, ) -> Result<(Arc, Option>)> { match credentials { @@ -781,7 +869,7 @@ async fn get_azure_storage_client( // be fetched via Managed Identity Credential. This are similar to // BlobCredentials::Key and BlobCredentials::ManagedCredentials respectively, except for // the fact that it is passed via an ini file. We have to try both methods. - match get_azure_storage_client_from_profile_key(profile.clone()) { + match get_azure_storage_client_from_profile_key(endpoint.clone(), profile.clone()) { Ok(tuple) => Ok(tuple), Err(_) => get_azure_storage_client_with_managed_key_profile(profile).await, } @@ -790,12 +878,15 @@ async fn get_azure_storage_client( azure_account, azure_key, } => Ok(get_azure_storage_client_from_key( + endpoint, &azure_account, &azure_key, )?), BlobCredentials::ManagedCredentials { azure_account } => { Ok(get_azure_storage_client_with_managed_key(&azure_account).await?) } - BlobCredentials::Automatic => Ok(get_azure_storage_client_automatic().await?), + BlobCredentials::Automatic => { + Ok(get_azure_storage_client_automatic(endpoint.clone()).await?) + } } } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs index 4ee7fa21b842..4b27d99a7102 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs @@ -173,6 +173,7 @@ pub enum ObjectAccessProtocol { credentials: S3Credentials, }, Blob { + endpoint: Option, credentials: BlobCredentials, }, } @@ -200,6 +201,7 @@ impl<'de> Deserialize<'de> for ObjectAccessProtocol { credentials: S3Credentials, }, Blob { + endpoint: Option, credentials: BlobCredentials, }, } @@ -212,6 +214,7 @@ impl<'de> Deserialize<'de> for ObjectAccessProtocol { credentials_profile: Option, }, Blob { + endpoint: Option, credentials_profile: Option, }, } @@ -239,9 +242,13 @@ impl<'de> Deserialize<'de> for ObjectAccessProtocol { region, credentials, }), - Upgrader::Serialized(Serialized::Blob { credentials }) => { - Ok(Self::Blob { credentials }) - } + Upgrader::Serialized(Serialized::Blob { + credentials, + endpoint, + }) => Ok(Self::Blob { + credentials, + endpoint, + }), Upgrader::S3Legacy { endpoint, region, @@ -261,8 +268,10 @@ impl<'de> Deserialize<'de> for ObjectAccessProtocol { credentials: credentials_profile.into(), }), Upgrader::Socket(Socket::Blob { + endpoint, credentials_profile, }) => Ok(Self::Blob { + endpoint, credentials: credentials_profile.into(), }), } @@ -295,8 +304,11 @@ impl ObjectAccess { S3ObjectAccess::new(&endpoint, ®ion, &bucket, credentials), readonly, )), - ObjectAccessProtocol::Blob { credentials } => { - let oa = BlobObjectAccess::new(&bucket, credentials).await?; + ObjectAccessProtocol::Blob { + credentials, + endpoint, + } => { + let oa = BlobObjectAccess::new(endpoint, &bucket, credentials).await?; Ok(ObjectAccess::from_blob(oa, readonly)) } } @@ -359,6 +371,7 @@ impl ObjectAccess { }, }, ObjectAccessEnum::Blob(oa) => ObjectAccessProtocol::Blob { + endpoint: oa.endpoint(), credentials: match oa.credentials_profile() { Some(profile) => BlobCredentials::Profile(profile), None => BlobCredentials::Automatic, @@ -545,8 +558,11 @@ impl BucketAccess { inner: BucketAccessEnum::S3(ba), })) } - ObjectAccessProtocol::Blob { credentials } => { - let ba = BlobBucketAccess::new(credentials).await?; + ObjectAccessProtocol::Blob { + credentials, + endpoint, + } => { + let ba = BlobBucketAccess::new(endpoint, credentials).await?; Ok(Arc::new(BucketAccess { inner: BucketAccessEnum::Azure(ba), })) @@ -624,6 +640,7 @@ pub enum RequestError { InvalidCredentials, /// The request time and the server time were too far out of sync TimeSkew, + EmulatorBug(String), } impl Display for RequestError { @@ -636,6 +653,7 @@ impl Display for RequestError { RequestError::ExpiredCredentials => f.write_str("Expired credentials"), RequestError::InvalidCredentials => f.write_str("Invalid credentials"), RequestError::TimeSkew => f.write_str("Request time too skewed"), + RequestError::EmulatorBug(s) => s.fmt(f), } } } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs index 2ce1c9491c0d..a922a658ce7a 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs @@ -165,6 +165,7 @@ impl S3ObjectAccess { RequestError::ExpiredCredentials => RequestError::ExpiredCredentials, RequestError::InvalidCredentials => RequestError::InvalidCredentials, RequestError::TimeSkew => RequestError::TimeSkew, + RequestError::EmulatorBug(s) => RequestError::EmulatorBug(s), } } diff --git a/cmd/zpool/zpool_vdev.c b/cmd/zpool/zpool_vdev.c index 28cd756b9796..2ed379229432 100644 --- a/cmd/zpool/zpool_vdev.c +++ b/cmd/zpool/zpool_vdev.c @@ -277,11 +277,6 @@ make_objstore_vdev(nvlist_t *props, const char *protocol, const char *arg) "vdev %s\n"), arg); fnvlist_free(vdev); return (NULL); - } else if (err == 0 && !s3) { - fprintf(stderr, gettext("Endpoint provided for objstore " - "vdev that doesn't support endpoints %s\n"), arg); - fnvlist_free(vdev); - return (NULL); } else if (err == 0) { fnvlist_add_string(vdev, zpool_prop_to_name(ZPOOL_PROP_OBJ_ENDPOINT), endpoint); From 65c0b51b53899d5c40261275c76d5d88eaaf81e5 Mon Sep 17 00:00:00 2001 From: Don Brady Date: Fri, 3 Jun 2022 10:40:22 -0600 Subject: [PATCH 09/10] zcache iostat -d panics when disk_stats is empty (#461) * zcache iostat -d panics when disk_stats is empty * review feedback --- cmd/zfs_object_agent/util/src/zettacache_stats.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/zfs_object_agent/util/src/zettacache_stats.rs b/cmd/zfs_object_agent/util/src/zettacache_stats.rs index 43cdcfc25abf..7e67562b874e 100644 --- a/cmd/zfs_object_agent/util/src/zettacache_stats.rs +++ b/cmd/zfs_object_agent/util/src/zettacache_stats.rs @@ -349,10 +349,9 @@ impl IoStats { pub fn max_name_len(&self) -> usize { self.disk_stats .iter() - .max_by_key(|stats| stats.name.len()) - .unwrap() - .name - .len() + .map(|stats| stats.name.len()) + .max() + .unwrap_or_default() } } From a5fb50f03e26b5299a998f49a1e8d9de42b669f0 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Fri, 3 Jun 2022 14:24:32 -0700 Subject: [PATCH 10/10] DLPX-81456 `zpool import` fails assertion (#466) --- lib/libzutil/zutil_import.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/libzutil/zutil_import.c b/lib/libzutil/zutil_import.c index 4669069a8662..6626691da586 100644 --- a/lib/libzutil/zutil_import.c +++ b/lib/libzutil/zutil_import.c @@ -1901,7 +1901,8 @@ zpool_find_import_agent(libpc_handle_t *hdl, importargs_t *iarg, nvlist_t *resp = zoa_send_recv_msg(hdl, msg, AGENT_PUBLIC_PROTOCOL_VERSION, ZFS_PUBLIC_SOCKET, NULL); - nvlist_t *pools = fnvlist_lookup_nvlist(resp, "pools"); + nvlist_t *pools = NULL; + (void) nvlist_lookup_nvlist(resp, "pools", &pools); nvpair_t *elem = NULL; while ((elem = nvlist_next_nvpair(pools, elem)) != NULL) {