From ba49c18fa69da2769d44cb2b2d91359beaf4f558 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Wed, 1 Jun 2022 10:25:24 -0700 Subject: [PATCH] zoa: determine response type automatically (#446) Since the `response_type` is the same as the `request_type`, the server infrastructure can fill it in automatically, so each request handler doesn't need to match it up manually. The "get pools" response contained the pools directly in the nvlist, and asserted that there are no other nvpairs. There was no `response_type` in this response message. This commit changes it so that "get pools" includes a `response_type`, and the pools are under a different `pools` nvpair. Additionally, the code in `PublicConnectionState::get_pools()` is reorganized to be more clear. Additionally, the code in `PublicConnectionState::get_destroying_pools()` is simplified, using the server helper method to convert to an nvlist. --- .../zettaobject/src/pool_destroy.rs | 21 +- .../zettaobject/src/public_connection.rs | 191 +++++++++--------- .../zettaobject/src/root_connection.rs | 45 +---- .../zettaobject/src/server.rs | 42 ++-- lib/libzutil/zutil_import.c | 4 +- 5 files changed, 135 insertions(+), 168 deletions(-) diff --git a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs index 190eee6b6318..ecb75186968d 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs @@ -11,7 +11,6 @@ use anyhow::Result; use futures::stream::StreamExt; use lazy_static::lazy_static; use log::*; -use nvpair::NvList; use serde::Deserialize; use serde::Serialize; use tokio::fs; @@ -64,8 +63,8 @@ struct DestroyingCachePhys { pools: HashMap, } -#[derive(Serialize, Debug)] -struct DestroyingPool { +#[derive(Serialize, Debug, Clone)] +pub struct DestroyingPool { #[serde(flatten)] cache_phys: DestroyingCacheItemPhys, #[serde(flatten)] @@ -137,17 +136,6 @@ impl DestroyingPoolsMap { }); } } - - fn to_nvlist(&self) -> NvList { - let mut nvl = NvList::new_unique_names(); - - for (guid, destroying_pool) in self.pools.iter() { - let nvl_item = nvpair::to_nvlist(destroying_pool).unwrap(); - nvl.insert(format!("{}", guid), nvl_item.as_ref()).unwrap(); - } - - nvl - } } #[derive(Default, Debug)] @@ -438,7 +426,7 @@ pub async fn resume_destroy(object_access: Arc, guid: PoolGuid) -> /// Retrieve the PoolDestroyer's list of pools that are either being destroyed or have been /// destroyed. -pub async fn get_destroy_list() -> NvList { +pub async fn get_destroy_list() -> HashMap { maybe_die_with(|| "in get_destroy_list"); let maybe_pool_destroyer = POOL_DESTROYER.lock().await; @@ -446,7 +434,8 @@ pub async fn get_destroy_list() -> NvList { .as_ref() .unwrap() .destroying_pools_map - .to_nvlist() + .pools + .clone() } /// Remove pools that have been successfully destroyed from the PoolDestroyer's list of pools. diff --git a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs index 30a8da0d72b5..d8a591f1e004 100644 --- a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs @@ -1,8 +1,8 @@ +use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use std::sync::Mutex; -use anyhow::Result; use futures::stream::StreamExt; use log::*; use nvpair::NvList; @@ -21,6 +21,7 @@ use crate::object_access::ObjectAccess; use crate::object_access::ObjectAccessProtocol; use crate::pool::*; use crate::pool_destroy; +use crate::pool_destroy::DestroyingPool; use crate::server::return_ok; use crate::server::ConnectionState; use crate::server::HandlerReturn; @@ -90,102 +91,57 @@ impl PublicConnectionState { fn get_pools(&mut self, nvl: NvList) -> HandlerReturn { info!("got request: {:?}", nvl); - Ok(Box::pin(async move { Self::get_pools_impl(nvl).await })) - } - - async fn get_pools_impl(nvl: NvList) -> Result> { - #[derive(Debug, Deserialize)] - struct GetPoolsRequest { - #[serde(flatten)] - protocol: ObjectAccessProtocol, - #[serde(default)] - readonly: bool, - bucket: Option, - guid: Option, - } - let request: GetPoolsRequest = nvpair::from_nvlist(&nvl)?; - - let bucket_access = BucketAccess::new(request.protocol.clone()).await?; + Ok(Box::pin(async move { + #[derive(Debug, Deserialize)] + struct GetPoolsRequest { + #[serde(flatten)] + protocol: ObjectAccessProtocol, + bucket: Option, + guid: Option, + } + let request: GetPoolsRequest = nvpair::from_nvlist(&nvl)?; + let bucket_access = BucketAccess::new(request.protocol.clone()).await?; + let buckets = if let Some(bucket) = request.bucket { + vec![bucket] + } else { + bucket_access.list_buckets().await + }; - let mut buckets = vec![]; - if let Some(bucket) = request.bucket { - buckets.push(bucket); - } else { - buckets.append(&mut bucket_access.list_buckets().await); - } + maybe_die_with(|| "in get_pools_impl"); - maybe_die_with(|| "in get_pools_impl"); - let response = Arc::new(Mutex::new(NvList::new_unique_names())); - for buck in buckets { - let object_access = - ObjectAccess::new(request.protocol.clone(), buck, request.readonly).await?; - if let Some(guid) = request.guid { - if !Pool::exists(&object_access, PoolGuid(guid)).await { - continue; - } - - match Pool::get_config(&object_access, PoolGuid(guid)).await { - Ok(pool_config) => { - let mut owned_response = - Arc::try_unwrap(response).unwrap().into_inner().unwrap(); - owned_response - .insert(format!("{}", guid), pool_config.as_ref()) - .unwrap(); - debug!("sending response: {:?}", owned_response); - return Ok(Some(owned_response)); - } - Err(e) => { - error!("skipping {:?}: {:?}", guid, e); - continue; - } + let pools = Arc::new(Mutex::new(NvList::new_unique_names())); + for buck in buckets { + let object_access = ObjectAccess::new(request.protocol.clone(), buck, true).await?; + if let Some(guid) = request.guid { + find_pool(object_access, PoolGuid(guid), pools.clone()).await; + } else { + discover_pools(object_access, pools.clone()).await; } } - - object_access - .list_prefixes("zfs/".to_string()) - .for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| { - let my_object_access = object_access.clone(); - let my_response = response.clone(); - async move { - debug!("prefix: {}", prefix); - let split: Vec<&str> = prefix.rsplitn(3, '/').collect(); - let guid_str = split[1]; - if let Ok(guid64) = str::parse::(guid_str) { - let guid = PoolGuid(guid64); - match Pool::get_config(&my_object_access, guid).await { - Ok(pool_config) => my_response - .lock() - .unwrap() - .insert(guid_str, pool_config.as_ref()) - .unwrap(), - Err(e) => { - error!("skipping {:?}: {:?}", guid, e); - } - } - } - } - }) - .await; - } - let owned_response = Arc::try_unwrap(response).unwrap().into_inner().unwrap(); - info!("sending response: {:?}", owned_response); - Ok(Some(owned_response)) + let pools = Arc::try_unwrap(pools).unwrap().into_inner().unwrap(); + let mut response = NvList::new_unique_names(); + response.insert("pools", pools.as_ref())?; + info!("sending response: {response:?}"); + Ok(Some(response)) + })) } fn get_destroying_pools(&mut self, nvl: NvList) -> HandlerReturn { Ok(Box::pin(async move { - // XXX convert to use serde nvlist response debug!("got request: {:?}", nvl); - let pools = pool_destroy::get_destroy_list().await; - let mut response = NvList::new_unique_names(); - response - .insert(AGENT_RESPONSE_TYPE, TYPE_GET_DESTROYING_POOLS) - .unwrap(); - response.insert("pools", pools.as_ref()).unwrap(); - - debug!("sending response: {:?}", response); - Ok(Some(response)) + #[derive(Debug, Serialize)] + struct DestroyingPoolsResponse { + pools: HashMap, + } + let response = DestroyingPoolsResponse { + pools: pool_destroy::get_destroy_list() + .await + .into_iter() + .map(|(guid, destroying)| (guid.to_string(), destroying)) + .collect(), + }; + return_ok(response, true) })) } @@ -195,13 +151,7 @@ impl PublicConnectionState { debug!("got request: {:?}", nvl); pool_destroy::remove_not_in_progress().await; - let mut response = NvList::new_unique_names(); - response - .insert(AGENT_RESPONSE_TYPE, TYPE_CLEAR_DESTROYED_POOLS) - .unwrap(); - - debug!("sending response: {:?}", response); - Ok(Some(response)) + Ok(Some(NvList::new_unique_names())) })) } @@ -225,7 +175,7 @@ impl PublicConnectionState { bucket_size: phys.bucket_size, combined_histogram, }; - return_ok(TYPE_REPORT_HITS, response, true) + return_ok(response, true) })) } @@ -240,7 +190,7 @@ impl PublicConnectionState { let response = ListDevicesResponse { devices_json: serde_json::to_string(&cache.devices()).unwrap(), }; - return_ok(TYPE_LIST_DEVICES, response, true) + return_ok(response, true) })) } @@ -255,7 +205,7 @@ impl PublicConnectionState { let response = ZcacheIostatResponse { iostats_json: serde_json::to_string(&cache.io_stats()).unwrap(), }; - return_ok(TYPE_ZCACHE_IOSTAT, response, false) + return_ok(response, false) })) } @@ -270,7 +220,52 @@ impl PublicConnectionState { let response = ZcacheStatsResponse { stats_json: serde_json::to_string(&cache.stats()).unwrap(), }; - return_ok(TYPE_ZCACHE_STATS, response, false) + return_ok(response, false) })) } } + +async fn discover_pools(object_access: Arc, pools: Arc>) { + object_access + .list_prefixes("zfs/".to_string()) + .for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| { + let my_object_access = object_access.clone(); + let pools = pools.clone(); + async move { + debug!("prefix: {}", prefix); + let split = prefix.rsplitn(3, '/').collect::>(); + let guid_str = split[1]; + if let Ok(guid64) = str::parse::(guid_str) { + let guid = PoolGuid(guid64); + match Pool::get_config(&my_object_access, guid).await { + Ok(pool_config) => pools + .lock() + .unwrap() + .insert(guid_str, pool_config.as_ref()) + .unwrap(), + Err(e) => { + error!("skipping {guid:?}: {e:?}"); + } + } + } + } + }) + .await; +} + +async fn find_pool(object_access: Arc, guid: PoolGuid, pools: Arc>) { + if Pool::exists(&object_access, guid).await { + match Pool::get_config(&object_access, guid).await { + Ok(pool_config) => { + pools + .lock() + .unwrap() + .insert(format!("{}", guid), pool_config.as_ref()) + .unwrap(); + } + Err(e) => { + error!("skipping {guid:?}: {e:?}"); + } + } + } +} diff --git a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs index c5e6fec358b2..906d2bcb5721 100644 --- a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs @@ -150,7 +150,7 @@ impl RootConnectionState { .await .map_err(|e| FailureMessage::new(e.into())); - return_result(TYPE_CREATE_POOL, request.id, result, true) + return_result(request.id, result, true) }) } @@ -269,7 +269,7 @@ impl RootConnectionState { }) } }; - return_result(TYPE_OPEN_POOL, request.id, result, true) + return_result(request.id, result, true) }) } @@ -349,13 +349,11 @@ impl RootConnectionState { .await; #[derive(Debug, Serialize)] struct EndTxgResponse { - response_type: &'static str, #[serde(flatten)] stats: PoolStatsPhys, features: HashMap, } let response = EndTxgResponse { - response_type: TYPE_END_TXG, stats, features: features .into_iter() @@ -488,7 +486,6 @@ impl RootConnectionState { } let mut response = NvList::new_unique_names(); - response.insert("response_type", TYPE_GET_STATS).unwrap(); response.insert("token", &request.token).unwrap(); response.insert("stats", nvl.as_ref()).unwrap(); @@ -516,13 +513,8 @@ impl RootConnectionState { .await; } #[derive(Debug, Serialize)] - struct ClosePoolResponse { - response_type: &'static str, - } - let response = ClosePoolResponse { - response_type: TYPE_CLOSE_POOL, - }; - return_struct(response, true) + struct ClosePoolResponse {} + return_struct(ClosePoolResponse {}, true) })) } @@ -550,14 +542,12 @@ impl RootConnectionState { #[derive(Debug, Serialize)] struct EnableFeatureResponse { - response_type: &'static str, feature: String, } let response = EnableFeatureResponse { - response_type: TYPE_ENABLE_FEATURE, feature: request.feature, }; - handler_return_struct(response, true) + Ok(Box::pin(future::ready(return_struct(response, true)))) } fn resume_destroy_pool(&mut self, nvl: NvList) -> HandlerReturn { @@ -575,22 +565,16 @@ impl RootConnectionState { let result = pool_destroy::resume_destroy(object_access, request.guid) .await .map_err(FailureMessage::new); - return_result(TYPE_RESUME_DESTROY_POOL, (), result, true) + return_result((), result, true) })) } fn clear_hit_data(&mut self, _nvl: NvList) -> HandlerReturn { let cache = self.cache.clone(); Ok(Box::pin(async move { - #[derive(Debug, Serialize)] - struct ClearHitDataResponse { - response_type: &'static str, - result: &'static str, - } - debug!("got ClearHitDataRequest"); cache.clear_hit_data().await; - return_ok(TYPE_CLEAR_HIT_DATA, (), true) + return_ok((), true) })) } @@ -604,7 +588,7 @@ impl RootConnectionState { .add_disk(&request.path) .await .map_err(FailureMessage::new); - return_result(TYPE_ADD_DISK, (), result, true) + return_result((), result, true) })) } @@ -618,7 +602,7 @@ impl RootConnectionState { .expand_disk(&request.path) .await .map_err(FailureMessage::new); - return_result(TYPE_EXPAND_DISK, (), result, true) + return_result((), result, true) })) } @@ -628,7 +612,7 @@ impl RootConnectionState { debug!("got {:?}", nvl); cache.sync_checkpoint().await; - return_ok(TYPE_SYNC_CHECKPOINT, (), true) + return_ok((), true) })) } @@ -638,7 +622,7 @@ impl RootConnectionState { debug!("got {:?}", nvl); cache.initiate_merge().await; - return_ok(TYPE_INITIATE_MERGE, (), true) + return_ok((), true) })) } } @@ -661,10 +645,3 @@ where } Ok(Some(nvl)) } - -fn handler_return_struct(response: T, debug: bool) -> HandlerReturn -where - T: Debug + Serialize, -{ - Ok(Box::pin(future::ready(return_struct(response, debug)))) -} diff --git a/cmd/zfs_object_agent/zettaobject/src/server.rs b/cmd/zfs_object_agent/zettaobject/src/server.rs index a0d480afb5d7..ed484d7c2c28 100644 --- a/cmd/zfs_object_agent/zettaobject/src/server.rs +++ b/cmd/zfs_object_agent/zettaobject/src/server.rs @@ -317,11 +317,14 @@ where with_alloctag_hf("Server::start_connection() NvList::lookup_string()", || { nvl.lookup_string(AGENT_REQUEST_TYPE) })?; - let request_type = request_type_cstr.to_str()?; - match self.nvlist_handlers.get(request_type) { + let request_type = request_type_cstr.to_str()?.to_owned(); + match self.nvlist_handlers.get(&request_type) { Some(HandlerEnum::Serial(handler)) => { let response_opt = handler(&mut state, nvl).await?; - if let Some(response) = response_opt { + if let Some(mut response) = response_opt { + response + .insert(AGENT_RESPONSE_TYPE, request_type.as_str()) + .unwrap(); responder.respond_with_nvlist(response); } } @@ -331,7 +334,10 @@ where let responder = responder.clone(); tokio::spawn(async move { match fut.await { - Ok(Some(response)) => { + Ok(Some(mut response)) => { + response + .insert(AGENT_RESPONSE_TYPE, request_type.as_str()) + .unwrap(); responder.respond_with_nvlist(response); } Ok(None) => {} @@ -513,7 +519,7 @@ pub enum FailureMessage { impl Debug for FailureMessage { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - FailureMessage::Other { message } => write!(f, "{}", message), + FailureMessage::Other { message } => write!(f, "{message}"), } } } @@ -535,14 +541,14 @@ impl FailureMessage { /// (not tuple-like). FailureMessage is an example. /// /// The response nvlist will have the following nvpairs: -/// * "response_type" -> response_type (string) /// * fields from R /// * if result.is_ok(), fields from O /// * if result.is_err(), "err" -> EnumVariantName (string) /// * if result.is_err(), "errstr" -> stringified error (string) /// * if result.is_err(), fields from the varant of E +/// +/// Note that the server infrastructure will add the "response_type" nvpair pub fn return_result( - response_type: &str, request_id: R, result: Result, debug: bool, @@ -553,8 +559,7 @@ where E: Debug + Serialize, { #[derive(Debug, Serialize)] - struct Response<'a, R, O, E> { - response_type: &'a str, + struct Response { #[serde(flatten)] request: R, #[serde(flatten)] @@ -565,16 +570,15 @@ where } if let Err(e) = &result { - error!("sending failure: {:?}", e); + error!("sending failure: {e:?}"); } let (ok, errstr, err) = match result { Ok(o) => (Some(o), None, None), - Err(e) => (None, Some(format!("{:?}", e)), Some(e)), + Err(e) => (None, Some(format!("{e:?}")), Some(e)), }; let response = Response { - response_type, request: request_id, ok, err, @@ -582,9 +586,9 @@ where }; if debug { - trace!("sending response: {:?}", response); + trace!("sending response: {response:?}"); } else { - super_trace!("sending response: {:?}", response); + super_trace!("sending response: {response:?}"); } let nvl = nvpair::to_nvlist(&response)?; @@ -596,17 +600,17 @@ where } if debug { - maybe_die_with(|| format!("before sending response: {:?}", nvl)); - debug!("sending response nvl: {:?}", nvl); + maybe_die_with(|| format!("before sending response: {nvl:?}")); + debug!("sending response nvl: {nvl:?}"); } else { - super_trace!("sending response nvl: {:?}", nvl); + super_trace!("sending response nvl: {nvl:?}"); } Ok(Some(nvl)) } -pub fn return_ok(response_type: &str, response: O, debug: bool) -> Result> +pub fn return_ok(response: O, debug: bool) -> Result> where O: Debug + Serialize, { - return_result(response_type, (), Ok::<_, ()>(response), debug) + return_result((), Ok::<_, ()>(response), debug) } diff --git a/lib/libzutil/zutil_import.c b/lib/libzutil/zutil_import.c index ebb1ac2a55d8..4669069a8662 100644 --- a/lib/libzutil/zutil_import.c +++ b/lib/libzutil/zutil_import.c @@ -1901,8 +1901,10 @@ zpool_find_import_agent(libpc_handle_t *hdl, importargs_t *iarg, nvlist_t *resp = zoa_send_recv_msg(hdl, msg, AGENT_PUBLIC_PROTOCOL_VERSION, ZFS_PUBLIC_SOCKET, NULL); + nvlist_t *pools = fnvlist_lookup_nvlist(resp, "pools"); + nvpair_t *elem = NULL; - while ((elem = nvlist_next_nvpair(resp, elem)) != NULL) { + while ((elem = nvlist_next_nvpair(pools, elem)) != NULL) { avl_index_t where; rdsk_node_t *slice; nvlist_t *config;