Skip to content

Commit

Permalink
zoa: determine response type automatically (openzfs#446)
Browse files Browse the repository at this point in the history
Since the `response_type` is the same as the `request_type`, the server
infrastructure can fill it in automatically, so each request handler
doesn't need to match it up manually.

The "get pools" response contained the pools directly in the nvlist, and
asserted that there are no other nvpairs.  There was no `response_type`
in this response message.  This commit changes it so that "get pools"
includes a `response_type`, and the pools are under a different `pools`
nvpair.  Additionally, the code in `PublicConnectionState::get_pools()`
is reorganized to be more clear.

Additionally, the code in `PublicConnectionState::get_destroying_pools()`
is simplified, using the server helper method to convert to an nvlist.
  • Loading branch information
ahrens authored Jun 1, 2022
1 parent ee3f6e1 commit ba49c18
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 168 deletions.
21 changes: 5 additions & 16 deletions cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use anyhow::Result;
use futures::stream::StreamExt;
use lazy_static::lazy_static;
use log::*;
use nvpair::NvList;
use serde::Deserialize;
use serde::Serialize;
use tokio::fs;
Expand Down Expand Up @@ -64,8 +63,8 @@ struct DestroyingCachePhys {
pools: HashMap<PoolGuid, DestroyingCacheItemPhys>,
}

#[derive(Serialize, Debug)]
struct DestroyingPool {
#[derive(Serialize, Debug, Clone)]
pub struct DestroyingPool {
#[serde(flatten)]
cache_phys: DestroyingCacheItemPhys,
#[serde(flatten)]
Expand Down Expand Up @@ -137,17 +136,6 @@ impl DestroyingPoolsMap {
});
}
}

fn to_nvlist(&self) -> NvList {
let mut nvl = NvList::new_unique_names();

for (guid, destroying_pool) in self.pools.iter() {
let nvl_item = nvpair::to_nvlist(destroying_pool).unwrap();
nvl.insert(format!("{}", guid), nvl_item.as_ref()).unwrap();
}

nvl
}
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -438,15 +426,16 @@ pub async fn resume_destroy(object_access: Arc<ObjectAccess>, guid: PoolGuid) ->

/// Retrieve the PoolDestroyer's list of pools that are either being destroyed or have been
/// destroyed.
pub async fn get_destroy_list() -> NvList {
pub async fn get_destroy_list() -> HashMap<PoolGuid, DestroyingPool> {
maybe_die_with(|| "in get_destroy_list");

let maybe_pool_destroyer = POOL_DESTROYER.lock().await;
maybe_pool_destroyer
.as_ref()
.unwrap()
.destroying_pools_map
.to_nvlist()
.pools
.clone()
}

/// Remove pools that have been successfully destroyed from the PoolDestroyer's list of pools.
Expand Down
191 changes: 93 additions & 98 deletions cmd/zfs_object_agent/zettaobject/src/public_connection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;

use anyhow::Result;
use futures::stream::StreamExt;
use log::*;
use nvpair::NvList;
Expand All @@ -21,6 +21,7 @@ use crate::object_access::ObjectAccess;
use crate::object_access::ObjectAccessProtocol;
use crate::pool::*;
use crate::pool_destroy;
use crate::pool_destroy::DestroyingPool;
use crate::server::return_ok;
use crate::server::ConnectionState;
use crate::server::HandlerReturn;
Expand Down Expand Up @@ -90,102 +91,57 @@ impl PublicConnectionState {

fn get_pools(&mut self, nvl: NvList) -> HandlerReturn {
info!("got request: {:?}", nvl);
Ok(Box::pin(async move { Self::get_pools_impl(nvl).await }))
}

async fn get_pools_impl(nvl: NvList) -> Result<Option<NvList>> {
#[derive(Debug, Deserialize)]
struct GetPoolsRequest {
#[serde(flatten)]
protocol: ObjectAccessProtocol,
#[serde(default)]
readonly: bool,
bucket: Option<String>,
guid: Option<u64>,
}
let request: GetPoolsRequest = nvpair::from_nvlist(&nvl)?;

let bucket_access = BucketAccess::new(request.protocol.clone()).await?;
Ok(Box::pin(async move {
#[derive(Debug, Deserialize)]
struct GetPoolsRequest {
#[serde(flatten)]
protocol: ObjectAccessProtocol,
bucket: Option<String>,
guid: Option<u64>,
}
let request: GetPoolsRequest = nvpair::from_nvlist(&nvl)?;
let bucket_access = BucketAccess::new(request.protocol.clone()).await?;
let buckets = if let Some(bucket) = request.bucket {
vec![bucket]
} else {
bucket_access.list_buckets().await
};

let mut buckets = vec![];
if let Some(bucket) = request.bucket {
buckets.push(bucket);
} else {
buckets.append(&mut bucket_access.list_buckets().await);
}
maybe_die_with(|| "in get_pools_impl");

maybe_die_with(|| "in get_pools_impl");
let response = Arc::new(Mutex::new(NvList::new_unique_names()));
for buck in buckets {
let object_access =
ObjectAccess::new(request.protocol.clone(), buck, request.readonly).await?;
if let Some(guid) = request.guid {
if !Pool::exists(&object_access, PoolGuid(guid)).await {
continue;
}

match Pool::get_config(&object_access, PoolGuid(guid)).await {
Ok(pool_config) => {
let mut owned_response =
Arc::try_unwrap(response).unwrap().into_inner().unwrap();
owned_response
.insert(format!("{}", guid), pool_config.as_ref())
.unwrap();
debug!("sending response: {:?}", owned_response);
return Ok(Some(owned_response));
}
Err(e) => {
error!("skipping {:?}: {:?}", guid, e);
continue;
}
let pools = Arc::new(Mutex::new(NvList::new_unique_names()));
for buck in buckets {
let object_access = ObjectAccess::new(request.protocol.clone(), buck, true).await?;
if let Some(guid) = request.guid {
find_pool(object_access, PoolGuid(guid), pools.clone()).await;
} else {
discover_pools(object_access, pools.clone()).await;
}
}

object_access
.list_prefixes("zfs/".to_string())
.for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| {
let my_object_access = object_access.clone();
let my_response = response.clone();
async move {
debug!("prefix: {}", prefix);
let split: Vec<&str> = prefix.rsplitn(3, '/').collect();
let guid_str = split[1];
if let Ok(guid64) = str::parse::<u64>(guid_str) {
let guid = PoolGuid(guid64);
match Pool::get_config(&my_object_access, guid).await {
Ok(pool_config) => my_response
.lock()
.unwrap()
.insert(guid_str, pool_config.as_ref())
.unwrap(),
Err(e) => {
error!("skipping {:?}: {:?}", guid, e);
}
}
}
}
})
.await;
}
let owned_response = Arc::try_unwrap(response).unwrap().into_inner().unwrap();
info!("sending response: {:?}", owned_response);
Ok(Some(owned_response))
let pools = Arc::try_unwrap(pools).unwrap().into_inner().unwrap();
let mut response = NvList::new_unique_names();
response.insert("pools", pools.as_ref())?;
info!("sending response: {response:?}");
Ok(Some(response))
}))
}

fn get_destroying_pools(&mut self, nvl: NvList) -> HandlerReturn {
Ok(Box::pin(async move {
// XXX convert to use serde nvlist response
debug!("got request: {:?}", nvl);
let pools = pool_destroy::get_destroy_list().await;

let mut response = NvList::new_unique_names();
response
.insert(AGENT_RESPONSE_TYPE, TYPE_GET_DESTROYING_POOLS)
.unwrap();
response.insert("pools", pools.as_ref()).unwrap();

debug!("sending response: {:?}", response);
Ok(Some(response))
#[derive(Debug, Serialize)]
struct DestroyingPoolsResponse {
pools: HashMap<String, DestroyingPool>,
}
let response = DestroyingPoolsResponse {
pools: pool_destroy::get_destroy_list()
.await
.into_iter()
.map(|(guid, destroying)| (guid.to_string(), destroying))
.collect(),
};
return_ok(response, true)
}))
}

Expand All @@ -195,13 +151,7 @@ impl PublicConnectionState {
debug!("got request: {:?}", nvl);
pool_destroy::remove_not_in_progress().await;

let mut response = NvList::new_unique_names();
response
.insert(AGENT_RESPONSE_TYPE, TYPE_CLEAR_DESTROYED_POOLS)
.unwrap();

debug!("sending response: {:?}", response);
Ok(Some(response))
Ok(Some(NvList::new_unique_names()))
}))
}

Expand All @@ -225,7 +175,7 @@ impl PublicConnectionState {
bucket_size: phys.bucket_size,
combined_histogram,
};
return_ok(TYPE_REPORT_HITS, response, true)
return_ok(response, true)
}))
}

Expand All @@ -240,7 +190,7 @@ impl PublicConnectionState {
let response = ListDevicesResponse {
devices_json: serde_json::to_string(&cache.devices()).unwrap(),
};
return_ok(TYPE_LIST_DEVICES, response, true)
return_ok(response, true)
}))
}

Expand All @@ -255,7 +205,7 @@ impl PublicConnectionState {
let response = ZcacheIostatResponse {
iostats_json: serde_json::to_string(&cache.io_stats()).unwrap(),
};
return_ok(TYPE_ZCACHE_IOSTAT, response, false)
return_ok(response, false)
}))
}

Expand All @@ -270,7 +220,52 @@ impl PublicConnectionState {
let response = ZcacheStatsResponse {
stats_json: serde_json::to_string(&cache.stats()).unwrap(),
};
return_ok(TYPE_ZCACHE_STATS, response, false)
return_ok(response, false)
}))
}
}

async fn discover_pools(object_access: Arc<ObjectAccess>, pools: Arc<Mutex<NvList>>) {
object_access
.list_prefixes("zfs/".to_string())
.for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| {
let my_object_access = object_access.clone();
let pools = pools.clone();
async move {
debug!("prefix: {}", prefix);
let split = prefix.rsplitn(3, '/').collect::<Vec<_>>();
let guid_str = split[1];
if let Ok(guid64) = str::parse::<u64>(guid_str) {
let guid = PoolGuid(guid64);
match Pool::get_config(&my_object_access, guid).await {
Ok(pool_config) => pools
.lock()
.unwrap()
.insert(guid_str, pool_config.as_ref())
.unwrap(),
Err(e) => {
error!("skipping {guid:?}: {e:?}");
}
}
}
}
})
.await;
}

async fn find_pool(object_access: Arc<ObjectAccess>, guid: PoolGuid, pools: Arc<Mutex<NvList>>) {
if Pool::exists(&object_access, guid).await {
match Pool::get_config(&object_access, guid).await {
Ok(pool_config) => {
pools
.lock()
.unwrap()
.insert(format!("{}", guid), pool_config.as_ref())
.unwrap();
}
Err(e) => {
error!("skipping {guid:?}: {e:?}");
}
}
}
}
Loading

0 comments on commit ba49c18

Please sign in to comment.