diff --git a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs index 190eee6b6318..ecb75186968d 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs @@ -11,7 +11,6 @@ use anyhow::Result; use futures::stream::StreamExt; use lazy_static::lazy_static; use log::*; -use nvpair::NvList; use serde::Deserialize; use serde::Serialize; use tokio::fs; @@ -64,8 +63,8 @@ struct DestroyingCachePhys { pools: HashMap, } -#[derive(Serialize, Debug)] -struct DestroyingPool { +#[derive(Serialize, Debug, Clone)] +pub struct DestroyingPool { #[serde(flatten)] cache_phys: DestroyingCacheItemPhys, #[serde(flatten)] @@ -137,17 +136,6 @@ impl DestroyingPoolsMap { }); } } - - fn to_nvlist(&self) -> NvList { - let mut nvl = NvList::new_unique_names(); - - for (guid, destroying_pool) in self.pools.iter() { - let nvl_item = nvpair::to_nvlist(destroying_pool).unwrap(); - nvl.insert(format!("{}", guid), nvl_item.as_ref()).unwrap(); - } - - nvl - } } #[derive(Default, Debug)] @@ -438,7 +426,7 @@ pub async fn resume_destroy(object_access: Arc, guid: PoolGuid) -> /// Retrieve the PoolDestroyer's list of pools that are either being destroyed or have been /// destroyed. -pub async fn get_destroy_list() -> NvList { +pub async fn get_destroy_list() -> HashMap { maybe_die_with(|| "in get_destroy_list"); let maybe_pool_destroyer = POOL_DESTROYER.lock().await; @@ -446,7 +434,8 @@ pub async fn get_destroy_list() -> NvList { .as_ref() .unwrap() .destroying_pools_map - .to_nvlist() + .pools + .clone() } /// Remove pools that have been successfully destroyed from the PoolDestroyer's list of pools. diff --git a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs index 30a8da0d72b5..d8a591f1e004 100644 --- a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs @@ -1,8 +1,8 @@ +use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use std::sync::Mutex; -use anyhow::Result; use futures::stream::StreamExt; use log::*; use nvpair::NvList; @@ -21,6 +21,7 @@ use crate::object_access::ObjectAccess; use crate::object_access::ObjectAccessProtocol; use crate::pool::*; use crate::pool_destroy; +use crate::pool_destroy::DestroyingPool; use crate::server::return_ok; use crate::server::ConnectionState; use crate::server::HandlerReturn; @@ -90,102 +91,57 @@ impl PublicConnectionState { fn get_pools(&mut self, nvl: NvList) -> HandlerReturn { info!("got request: {:?}", nvl); - Ok(Box::pin(async move { Self::get_pools_impl(nvl).await })) - } - - async fn get_pools_impl(nvl: NvList) -> Result> { - #[derive(Debug, Deserialize)] - struct GetPoolsRequest { - #[serde(flatten)] - protocol: ObjectAccessProtocol, - #[serde(default)] - readonly: bool, - bucket: Option, - guid: Option, - } - let request: GetPoolsRequest = nvpair::from_nvlist(&nvl)?; - - let bucket_access = BucketAccess::new(request.protocol.clone()).await?; + Ok(Box::pin(async move { + #[derive(Debug, Deserialize)] + struct GetPoolsRequest { + #[serde(flatten)] + protocol: ObjectAccessProtocol, + bucket: Option, + guid: Option, + } + let request: GetPoolsRequest = nvpair::from_nvlist(&nvl)?; + let bucket_access = BucketAccess::new(request.protocol.clone()).await?; + let buckets = if let Some(bucket) = request.bucket { + vec![bucket] + } else { + bucket_access.list_buckets().await + }; - let mut buckets = vec![]; - if let Some(bucket) = request.bucket { - buckets.push(bucket); - } else { - buckets.append(&mut bucket_access.list_buckets().await); - } + maybe_die_with(|| "in get_pools_impl"); - maybe_die_with(|| "in get_pools_impl"); - let response = Arc::new(Mutex::new(NvList::new_unique_names())); - for buck in buckets { - let object_access = - ObjectAccess::new(request.protocol.clone(), buck, request.readonly).await?; - if let Some(guid) = request.guid { - if !Pool::exists(&object_access, PoolGuid(guid)).await { - continue; - } - - match Pool::get_config(&object_access, PoolGuid(guid)).await { - Ok(pool_config) => { - let mut owned_response = - Arc::try_unwrap(response).unwrap().into_inner().unwrap(); - owned_response - .insert(format!("{}", guid), pool_config.as_ref()) - .unwrap(); - debug!("sending response: {:?}", owned_response); - return Ok(Some(owned_response)); - } - Err(e) => { - error!("skipping {:?}: {:?}", guid, e); - continue; - } + let pools = Arc::new(Mutex::new(NvList::new_unique_names())); + for buck in buckets { + let object_access = ObjectAccess::new(request.protocol.clone(), buck, true).await?; + if let Some(guid) = request.guid { + find_pool(object_access, PoolGuid(guid), pools.clone()).await; + } else { + discover_pools(object_access, pools.clone()).await; } } - - object_access - .list_prefixes("zfs/".to_string()) - .for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| { - let my_object_access = object_access.clone(); - let my_response = response.clone(); - async move { - debug!("prefix: {}", prefix); - let split: Vec<&str> = prefix.rsplitn(3, '/').collect(); - let guid_str = split[1]; - if let Ok(guid64) = str::parse::(guid_str) { - let guid = PoolGuid(guid64); - match Pool::get_config(&my_object_access, guid).await { - Ok(pool_config) => my_response - .lock() - .unwrap() - .insert(guid_str, pool_config.as_ref()) - .unwrap(), - Err(e) => { - error!("skipping {:?}: {:?}", guid, e); - } - } - } - } - }) - .await; - } - let owned_response = Arc::try_unwrap(response).unwrap().into_inner().unwrap(); - info!("sending response: {:?}", owned_response); - Ok(Some(owned_response)) + let pools = Arc::try_unwrap(pools).unwrap().into_inner().unwrap(); + let mut response = NvList::new_unique_names(); + response.insert("pools", pools.as_ref())?; + info!("sending response: {response:?}"); + Ok(Some(response)) + })) } fn get_destroying_pools(&mut self, nvl: NvList) -> HandlerReturn { Ok(Box::pin(async move { - // XXX convert to use serde nvlist response debug!("got request: {:?}", nvl); - let pools = pool_destroy::get_destroy_list().await; - let mut response = NvList::new_unique_names(); - response - .insert(AGENT_RESPONSE_TYPE, TYPE_GET_DESTROYING_POOLS) - .unwrap(); - response.insert("pools", pools.as_ref()).unwrap(); - - debug!("sending response: {:?}", response); - Ok(Some(response)) + #[derive(Debug, Serialize)] + struct DestroyingPoolsResponse { + pools: HashMap, + } + let response = DestroyingPoolsResponse { + pools: pool_destroy::get_destroy_list() + .await + .into_iter() + .map(|(guid, destroying)| (guid.to_string(), destroying)) + .collect(), + }; + return_ok(response, true) })) } @@ -195,13 +151,7 @@ impl PublicConnectionState { debug!("got request: {:?}", nvl); pool_destroy::remove_not_in_progress().await; - let mut response = NvList::new_unique_names(); - response - .insert(AGENT_RESPONSE_TYPE, TYPE_CLEAR_DESTROYED_POOLS) - .unwrap(); - - debug!("sending response: {:?}", response); - Ok(Some(response)) + Ok(Some(NvList::new_unique_names())) })) } @@ -225,7 +175,7 @@ impl PublicConnectionState { bucket_size: phys.bucket_size, combined_histogram, }; - return_ok(TYPE_REPORT_HITS, response, true) + return_ok(response, true) })) } @@ -240,7 +190,7 @@ impl PublicConnectionState { let response = ListDevicesResponse { devices_json: serde_json::to_string(&cache.devices()).unwrap(), }; - return_ok(TYPE_LIST_DEVICES, response, true) + return_ok(response, true) })) } @@ -255,7 +205,7 @@ impl PublicConnectionState { let response = ZcacheIostatResponse { iostats_json: serde_json::to_string(&cache.io_stats()).unwrap(), }; - return_ok(TYPE_ZCACHE_IOSTAT, response, false) + return_ok(response, false) })) } @@ -270,7 +220,52 @@ impl PublicConnectionState { let response = ZcacheStatsResponse { stats_json: serde_json::to_string(&cache.stats()).unwrap(), }; - return_ok(TYPE_ZCACHE_STATS, response, false) + return_ok(response, false) })) } } + +async fn discover_pools(object_access: Arc, pools: Arc>) { + object_access + .list_prefixes("zfs/".to_string()) + .for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| { + let my_object_access = object_access.clone(); + let pools = pools.clone(); + async move { + debug!("prefix: {}", prefix); + let split = prefix.rsplitn(3, '/').collect::>(); + let guid_str = split[1]; + if let Ok(guid64) = str::parse::(guid_str) { + let guid = PoolGuid(guid64); + match Pool::get_config(&my_object_access, guid).await { + Ok(pool_config) => pools + .lock() + .unwrap() + .insert(guid_str, pool_config.as_ref()) + .unwrap(), + Err(e) => { + error!("skipping {guid:?}: {e:?}"); + } + } + } + } + }) + .await; +} + +async fn find_pool(object_access: Arc, guid: PoolGuid, pools: Arc>) { + if Pool::exists(&object_access, guid).await { + match Pool::get_config(&object_access, guid).await { + Ok(pool_config) => { + pools + .lock() + .unwrap() + .insert(format!("{}", guid), pool_config.as_ref()) + .unwrap(); + } + Err(e) => { + error!("skipping {guid:?}: {e:?}"); + } + } + } +} diff --git a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs index c5e6fec358b2..906d2bcb5721 100644 --- a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs @@ -150,7 +150,7 @@ impl RootConnectionState { .await .map_err(|e| FailureMessage::new(e.into())); - return_result(TYPE_CREATE_POOL, request.id, result, true) + return_result(request.id, result, true) }) } @@ -269,7 +269,7 @@ impl RootConnectionState { }) } }; - return_result(TYPE_OPEN_POOL, request.id, result, true) + return_result(request.id, result, true) }) } @@ -349,13 +349,11 @@ impl RootConnectionState { .await; #[derive(Debug, Serialize)] struct EndTxgResponse { - response_type: &'static str, #[serde(flatten)] stats: PoolStatsPhys, features: HashMap, } let response = EndTxgResponse { - response_type: TYPE_END_TXG, stats, features: features .into_iter() @@ -488,7 +486,6 @@ impl RootConnectionState { } let mut response = NvList::new_unique_names(); - response.insert("response_type", TYPE_GET_STATS).unwrap(); response.insert("token", &request.token).unwrap(); response.insert("stats", nvl.as_ref()).unwrap(); @@ -516,13 +513,8 @@ impl RootConnectionState { .await; } #[derive(Debug, Serialize)] - struct ClosePoolResponse { - response_type: &'static str, - } - let response = ClosePoolResponse { - response_type: TYPE_CLOSE_POOL, - }; - return_struct(response, true) + struct ClosePoolResponse {} + return_struct(ClosePoolResponse {}, true) })) } @@ -550,14 +542,12 @@ impl RootConnectionState { #[derive(Debug, Serialize)] struct EnableFeatureResponse { - response_type: &'static str, feature: String, } let response = EnableFeatureResponse { - response_type: TYPE_ENABLE_FEATURE, feature: request.feature, }; - handler_return_struct(response, true) + Ok(Box::pin(future::ready(return_struct(response, true)))) } fn resume_destroy_pool(&mut self, nvl: NvList) -> HandlerReturn { @@ -575,22 +565,16 @@ impl RootConnectionState { let result = pool_destroy::resume_destroy(object_access, request.guid) .await .map_err(FailureMessage::new); - return_result(TYPE_RESUME_DESTROY_POOL, (), result, true) + return_result((), result, true) })) } fn clear_hit_data(&mut self, _nvl: NvList) -> HandlerReturn { let cache = self.cache.clone(); Ok(Box::pin(async move { - #[derive(Debug, Serialize)] - struct ClearHitDataResponse { - response_type: &'static str, - result: &'static str, - } - debug!("got ClearHitDataRequest"); cache.clear_hit_data().await; - return_ok(TYPE_CLEAR_HIT_DATA, (), true) + return_ok((), true) })) } @@ -604,7 +588,7 @@ impl RootConnectionState { .add_disk(&request.path) .await .map_err(FailureMessage::new); - return_result(TYPE_ADD_DISK, (), result, true) + return_result((), result, true) })) } @@ -618,7 +602,7 @@ impl RootConnectionState { .expand_disk(&request.path) .await .map_err(FailureMessage::new); - return_result(TYPE_EXPAND_DISK, (), result, true) + return_result((), result, true) })) } @@ -628,7 +612,7 @@ impl RootConnectionState { debug!("got {:?}", nvl); cache.sync_checkpoint().await; - return_ok(TYPE_SYNC_CHECKPOINT, (), true) + return_ok((), true) })) } @@ -638,7 +622,7 @@ impl RootConnectionState { debug!("got {:?}", nvl); cache.initiate_merge().await; - return_ok(TYPE_INITIATE_MERGE, (), true) + return_ok((), true) })) } } @@ -661,10 +645,3 @@ where } Ok(Some(nvl)) } - -fn handler_return_struct(response: T, debug: bool) -> HandlerReturn -where - T: Debug + Serialize, -{ - Ok(Box::pin(future::ready(return_struct(response, debug)))) -} diff --git a/cmd/zfs_object_agent/zettaobject/src/server.rs b/cmd/zfs_object_agent/zettaobject/src/server.rs index a0d480afb5d7..ed484d7c2c28 100644 --- a/cmd/zfs_object_agent/zettaobject/src/server.rs +++ b/cmd/zfs_object_agent/zettaobject/src/server.rs @@ -317,11 +317,14 @@ where with_alloctag_hf("Server::start_connection() NvList::lookup_string()", || { nvl.lookup_string(AGENT_REQUEST_TYPE) })?; - let request_type = request_type_cstr.to_str()?; - match self.nvlist_handlers.get(request_type) { + let request_type = request_type_cstr.to_str()?.to_owned(); + match self.nvlist_handlers.get(&request_type) { Some(HandlerEnum::Serial(handler)) => { let response_opt = handler(&mut state, nvl).await?; - if let Some(response) = response_opt { + if let Some(mut response) = response_opt { + response + .insert(AGENT_RESPONSE_TYPE, request_type.as_str()) + .unwrap(); responder.respond_with_nvlist(response); } } @@ -331,7 +334,10 @@ where let responder = responder.clone(); tokio::spawn(async move { match fut.await { - Ok(Some(response)) => { + Ok(Some(mut response)) => { + response + .insert(AGENT_RESPONSE_TYPE, request_type.as_str()) + .unwrap(); responder.respond_with_nvlist(response); } Ok(None) => {} @@ -513,7 +519,7 @@ pub enum FailureMessage { impl Debug for FailureMessage { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - FailureMessage::Other { message } => write!(f, "{}", message), + FailureMessage::Other { message } => write!(f, "{message}"), } } } @@ -535,14 +541,14 @@ impl FailureMessage { /// (not tuple-like). FailureMessage is an example. /// /// The response nvlist will have the following nvpairs: -/// * "response_type" -> response_type (string) /// * fields from R /// * if result.is_ok(), fields from O /// * if result.is_err(), "err" -> EnumVariantName (string) /// * if result.is_err(), "errstr" -> stringified error (string) /// * if result.is_err(), fields from the varant of E +/// +/// Note that the server infrastructure will add the "response_type" nvpair pub fn return_result( - response_type: &str, request_id: R, result: Result, debug: bool, @@ -553,8 +559,7 @@ where E: Debug + Serialize, { #[derive(Debug, Serialize)] - struct Response<'a, R, O, E> { - response_type: &'a str, + struct Response { #[serde(flatten)] request: R, #[serde(flatten)] @@ -565,16 +570,15 @@ where } if let Err(e) = &result { - error!("sending failure: {:?}", e); + error!("sending failure: {e:?}"); } let (ok, errstr, err) = match result { Ok(o) => (Some(o), None, None), - Err(e) => (None, Some(format!("{:?}", e)), Some(e)), + Err(e) => (None, Some(format!("{e:?}")), Some(e)), }; let response = Response { - response_type, request: request_id, ok, err, @@ -582,9 +586,9 @@ where }; if debug { - trace!("sending response: {:?}", response); + trace!("sending response: {response:?}"); } else { - super_trace!("sending response: {:?}", response); + super_trace!("sending response: {response:?}"); } let nvl = nvpair::to_nvlist(&response)?; @@ -596,17 +600,17 @@ where } if debug { - maybe_die_with(|| format!("before sending response: {:?}", nvl)); - debug!("sending response nvl: {:?}", nvl); + maybe_die_with(|| format!("before sending response: {nvl:?}")); + debug!("sending response nvl: {nvl:?}"); } else { - super_trace!("sending response nvl: {:?}", nvl); + super_trace!("sending response nvl: {nvl:?}"); } Ok(Some(nvl)) } -pub fn return_ok(response_type: &str, response: O, debug: bool) -> Result> +pub fn return_ok(response: O, debug: bool) -> Result> where O: Debug + Serialize, { - return_result(response_type, (), Ok::<_, ()>(response), debug) + return_result((), Ok::<_, ()>(response), debug) } diff --git a/lib/libzutil/zutil_import.c b/lib/libzutil/zutil_import.c index ebb1ac2a55d8..4669069a8662 100644 --- a/lib/libzutil/zutil_import.c +++ b/lib/libzutil/zutil_import.c @@ -1901,8 +1901,10 @@ zpool_find_import_agent(libpc_handle_t *hdl, importargs_t *iarg, nvlist_t *resp = zoa_send_recv_msg(hdl, msg, AGENT_PUBLIC_PROTOCOL_VERSION, ZFS_PUBLIC_SOCKET, NULL); + nvlist_t *pools = fnvlist_lookup_nvlist(resp, "pools"); + nvpair_t *elem = NULL; - while ((elem = nvlist_next_nvpair(resp, elem)) != NULL) { + while ((elem = nvlist_next_nvpair(pools, elem)) != NULL) { avl_index_t where; rdsk_node_t *slice; nvlist_t *config;