From eaf74fbcfedcec210c0d89378a56922f1e4a2a70 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Mon, 13 Mar 2023 08:39:34 -0700 Subject: [PATCH] cleanup pool open code (#742) Reduce code duplication by combining `Pool::open_from_txg()` (opening an existing pool) and `Pool::open()` (opening/creating a new pool). --- cmd/zfs_object_agent/zettaobject/src/pool.rs | 334 ++++++++----------- 1 file changed, 139 insertions(+), 195 deletions(-) diff --git a/cmd/zfs_object_agent/zettaobject/src/pool.rs b/cmd/zfs_object_agent/zettaobject/src/pool.rs index 1f4930d6472d..acbe0e13c7f0 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool.rs @@ -304,12 +304,11 @@ impl PoolPhys { last_txg: Txg(0), destroying_state: None, checkpoint_txg: None, - sentinel_creation: Some(SystemTime::now()), + sentinel_creation: None, }; // XXX make sure it doesn't already exist phys.put_timed(object_access, Some(*CREATE_WAIT_DURATION)) .await?; - phys.put_sentinel(object_access).await; Ok(phys) } } @@ -474,10 +473,8 @@ impl PendingObjectState { } } -/* - * Note: this struct is passed to the OBL code. It needs to be a separate struct from Pool, - * because it can't refer back to the OBL itself, which would create a circular reference. - */ +// Note: this struct is passed to the OBL code. It needs to be a separate struct from PoolState, +// because it can't refer back to the OBL itself, which would create a circular reference. pub struct PoolSharedState { pub object_access: Arc, pub guid: PoolGuid, @@ -855,26 +852,60 @@ impl PoolState { } impl Pool { - async fn open_from_txg( + pub async fn open( object_access: Arc, - pool_phys: &PoolPhys, - txg: Txg, + guid: PoolGuid, + requested_txg: Option, zettacache: Arc, - heartbeat_guard: Option, - readonly: bool, + id: Uuid, mut syncing_txg: Option, - ) -> Result<(Pool, UberblockPhys, BlockId), PoolOpenError> { - let phys = UberblockPhys::get(&object_access, pool_phys.guid, txg).await?; + rollback_to_checkpoint: bool, + ) -> Result<(Pool, Option, BlockId), PoolOpenError> { + let writeable = !object_access.readonly(); + let pool_phys = PoolPhys::get(&object_access, guid).await?; + + let sentinel_creation = if pool_phys.sentinel_creation.is_none() && writeable { + if !pool_phys.sentinel_exists(&object_access).await { + // This is either a newly created pool, or an existing pool that was created with + // older software. + pool_phys.put_sentinel(&object_access).await; + } + Some(SystemTime::now()) + } else { + pool_phys.sentinel_creation + }; - let features = check_features(phys.features.iter().map(|(f, r)| (f, *r)), readonly)?; + let txg_to_open = match rollback_to_checkpoint { + true => pool_phys + .checkpoint_txg + .ok_or(PoolOpenError::NoCheckpoint)?, + false => requested_txg.unwrap_or(pool_phys.last_txg), + }; + + let heartbeat_guard = match writeable { + true => Some(heartbeat::start_heartbeat(object_access.clone(), id).await), + false => None, + }; + + let uber_phys = match pool_phys.last_txg.0 != 0 { + true => Some(UberblockPhys::get(&object_access, pool_phys.guid, txg_to_open).await?), + false => None, + }; + + let features = match &uber_phys { + Some(uber_phys) => check_features( + uber_phys.features.iter().map(|(f, r)| (f, *r)), + object_access.readonly(), + )?, + None => Default::default(), + }; if let Some(resume_txg) = syncing_txg { - assert_ge!(resume_txg, phys.txg); - if resume_txg == phys.txg { - // The TXG that we're resuming was already synced. The agent - // must have died before the "end txg" response got to the - // kernel. To ensure that the next message is "end txg" (not - // "write block"), we set the syncing_txg to None. See + assert_ge!(resume_txg, txg_to_open); + if resume_txg == txg_to_open { + // The TXG that we're resuming was already synced. The agent must have died + // before the "end txg" response got to the kernel. To ensure that the next + // message is "end txg" (not "write block"), we set the syncing_txg to None. See // end_txg() for details. syncing_txg = None; } @@ -886,10 +917,22 @@ impl Pool { name: pool_phys.name.clone(), }); - // load block -> object mapping - let storage_object_log = - ObjectBasedLog::open_by_phys(shared_state.clone(), &phys.storage_object_log); - let object_block_map = ObjectBlockMap::load(&storage_object_log, phys.next_block).await?; + let storage_object_log = match &uber_phys { + Some(uber_phys) => { + ObjectBasedLog::open_by_phys(shared_state.clone(), &uber_phys.storage_object_log) + } + None => ObjectBasedLog::create( + shared_state.clone(), + &format!("zfs/{}/StorageObjectLog", pool_phys.guid), + ), + }; + + let next_block = match &uber_phys { + Some(uber_phys) => uber_phys.next_block, + None => BlockId(0), + }; + + let object_block_map = ObjectBlockMap::load(&storage_object_log, next_block).await?; let (resuming_tx, resuming_rx) = watch_once::channel(); // If not syncing, drop Sender so that Receivers will return immediately @@ -898,189 +941,88 @@ impl Pool { // If we are rolling backwards to a checkpoint and the checkpoint txg is equal // to the current txg, then delete the checkpoint and continue onwards. In all // other cases, the checkpoint txg is at least one TXG before the current txg. - let checkpoint_txg = if pool_phys.checkpoint_txg == Some(txg) { - None - } else { - pool_phys.checkpoint_txg + let checkpoint_txg = match pool_phys.checkpoint_txg == Some(txg_to_open) { + true => None, + false => pool_phys.checkpoint_txg, }; - let (txg_completion_tx, txg_completion_rx) = watch::channel(TxgCompletion { - txg, - checkpointed: checkpoint_txg.is_some(), - }); - measure!("metadata_cleanup").spawn(metadata_cleanup_task( - txg_completion_rx, - shared_state.clone(), - )); - let pool = Pool { - state: Arc::new(PoolState { - shared_state: shared_state.clone(), - syncing_state: std::sync::Mutex::new(Some(PoolSyncingState { - last_txg: phys.txg, - syncing_txg, - storage_object_log, - reclaim: Reclaim::open(&phys.reclaim_info, shared_state.clone()), - pending_object: PendingObjectState::NotPending(phys.next_block), - pending_unordered_writes: Unordered::new(phys.next_block), - stats: phys.stats, - reclaim_done: None, - object_deleter: ObjectDeleter::open( - object_access.clone(), - pool_phys.guid, - phys.obsolete_objects.clone(), - ), - pending_flushes: Default::default(), - features, - resuming_tx, - checkpoint_txg, - txg_completion_tx, - })), - zettacache, - object_block_map, - resuming_rx, - heartbeat_guard, - sentinel_creation: pool_phys.sentinel_creation, - }), + let reclaim = match &uber_phys { + Some(phys) => Reclaim::open(&phys.reclaim_info, shared_state.clone()), + None => Reclaim::create(shared_state.clone()), }; - let syncing_state = { - let mut guard = pool.state.syncing_state.lock().unwrap(); - guard.take().unwrap() + let object_deleter = match &uber_phys { + Some(phys) => ObjectDeleter::open( + object_access.clone(), + pool_phys.guid, + phys.obsolete_objects.clone(), + ), + None => ObjectDeleter::new(object_access.clone(), pool_phys.guid), }; - assert_eq!( - pool.state.object_block_map.len() as u64, - syncing_state.stats.objects_count - ); - - let next_block = syncing_state.next_block(); - - *pool.state.syncing_state.lock().unwrap() = Some(syncing_state); - Ok((pool, phys, next_block)) - } - - pub async fn open( - object_access: Arc, - guid: PoolGuid, - txg: Option, - zettacache: Arc, - id: Uuid, - syncing_txg: Option, - rollback: bool, - ) -> Result<(Pool, Option, BlockId), PoolOpenError> { - let mut phys = PoolPhys::get(&object_access, guid).await?; - if phys.sentinel_creation.is_none() && !object_access.readonly() { - if !phys.sentinel_exists(&object_access).await { - phys.put_sentinel(&object_access).await; - } - phys.sentinel_creation = Some(SystemTime::now()); - } - if phys.last_txg.0 == 0 { - assert!(!rollback); - let shared_state = Arc::new(PoolSharedState { - object_access: object_access.clone(), - guid, - name: phys.name, + let txg_completion_tx = { + let (tx, rx) = watch::channel(TxgCompletion { + txg: txg_to_open, + checkpointed: checkpoint_txg.is_some(), }); + measure!("metadata_cleanup").spawn(metadata_cleanup_task(rx, shared_state.clone())); + tx + }; - let storage_object_log = ObjectBasedLog::create( - shared_state.clone(), - &format!("zfs/{guid}/StorageObjectLog"), - ); - let object_block_map = ObjectBlockMap::load(&storage_object_log, BlockId(0)).await?; + let stats = match &uber_phys { + Some(uber_phys) => uber_phys.stats, + None => Default::default(), + }; - let (resuming_tx, resuming_rx) = watch_once::channel(); - // If not syncing, drop Sender so that Receivers will return immediately - let resuming_tx = syncing_txg.map(|_| resuming_tx); + assert_eq!(object_block_map.len() as u64, stats.objects_count); - let (txg_completion_tx, txg_completion_rx) = watch::channel(TxgCompletion { - txg: Txg(0), - checkpointed: false, - }); - measure!("metadata_cleanup").spawn(metadata_cleanup_task( - txg_completion_rx, - shared_state.clone(), - )); - - let mut pool = Pool { - state: Arc::new(PoolState { - shared_state: shared_state.clone(), - syncing_state: std::sync::Mutex::new(Some(PoolSyncingState { - last_txg: Txg(0), - syncing_txg, - storage_object_log, - reclaim: Reclaim::create(shared_state.clone()), - pending_object: PendingObjectState::NotPending(BlockId(0)), - pending_unordered_writes: Unordered::new(BlockId(0)), - stats: Default::default(), - reclaim_done: None, - object_deleter: ObjectDeleter::new(object_access.clone(), guid), - pending_flushes: Default::default(), - features: Default::default(), - resuming_tx, - checkpoint_txg: None, - txg_completion_tx, - })), - zettacache, - object_block_map, - resuming_rx, - heartbeat_guard: if !shared_state.object_access.readonly() { - Some( - heartbeat::start_heartbeat(shared_state.object_access.clone(), id) - .await, - ) - } else { - None - }, - sentinel_creation: phys.sentinel_creation, - }), - }; + let syncing_state = PoolSyncingState { + storage_object_log, + reclaim, + pending_object: PendingObjectState::NotPending(next_block), + pending_unordered_writes: Unordered::new(next_block), + last_txg: txg_to_open, + syncing_txg, + stats, + reclaim_done: None, + object_deleter, + pending_flushes: Default::default(), + txg_completion_tx, + features, + resuming_tx, + checkpoint_txg, + }; - let next_block = pool - .state - .with_syncing_state(|syncing_state| syncing_state.next_block()); - pool.claim(id).await.map(|_| (pool, None, next_block)) - } else { - let target = if rollback { - phys.checkpoint_txg.ok_or(PoolOpenError::NoCheckpoint)? - } else { - txg.unwrap_or(phys.last_txg) - }; - let (mut pool, ub, next_block) = Pool::open_from_txg( - object_access.clone(), - &phys, - target, + let mut pool = Pool { + state: Arc::new(PoolState { + syncing_state: std::sync::Mutex::new(Some(syncing_state)), + object_block_map, zettacache, - if !object_access.readonly() { - Some(heartbeat::start_heartbeat(object_access.clone(), id).await) - } else { - None - }, - object_access.readonly(), - syncing_txg, - ) - .await?; + shared_state, + resuming_rx, + heartbeat_guard, + sentinel_creation, + }), + }; - pool.claim(id).await?; - - if !object_access.readonly() { - let last_txg = pool - .state - .with_syncing_state(|syncing_state| syncing_state.last_txg); - if last_txg != phys.last_txg { - // We opened an older TXG. Before cleaning up (deleting) - // future TXG's, update the super object to the old TXG, so - // that if we re-open the pool we don't try to use the - // future (deleted) TXG. - let new_phys = PoolPhys { last_txg, ..phys }; - new_phys.put(&object_access).await; - } - if syncing_txg.is_none() { - pool.state.cleanup_data_objects().await; - } + pool.claim(id).await?; + + if writeable { + if txg_to_open != pool_phys.last_txg { + // We opened an older TXG. Before cleaning up (deleting) data from future TXG's, + // update the super object to the old TXG, so that if we re-open the pool we + // don't try to use the future (deleted) TXG. + let new_phys = PoolPhys { + last_txg: txg_to_open, + ..pool_phys + }; + new_phys.put(&object_access).await; + } + if syncing_txg.is_none() { + pool.state.cleanup_data_objects().await; } - Ok((pool, Some(ub), next_block)) } + Ok((pool, uber_phys, next_block)) } // Note that all recovered objects must be ObjectVersion(None) @@ -1287,8 +1229,8 @@ impl Pool { .collect(); // put syncing_state back in the Option - assert!(state.syncing_state.lock().unwrap().is_none()); - *state.syncing_state.lock().unwrap() = Some(syncing_state); + let old = state.syncing_state.lock().unwrap().replace(syncing_state); + assert!(old.is_none()); (stats, feature_vec, rate_limit_errors) } @@ -1300,7 +1242,9 @@ impl Pool { if let Some(rt) = syncing_state.reclaim_done.take() { rt.await.unwrap(); } - *state.syncing_state.lock().unwrap() = Some(syncing_state); + + let old = state.syncing_state.lock().unwrap().replace(syncing_state); + assert!(old.is_none()); } fn check_pending_flushes(state: &PoolState, syncing_state: &mut PoolSyncingState) {