From 51cd3670ad494e49b99a9e38a793c14fb5f15bbb Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Thu, 14 Oct 2021 08:47:21 -0700 Subject: [PATCH] DOSE-725 zio read never returns when agent is restarted (#508) If the agent restarts when there are outstanding "read block" requests, but we are not in the middle of a txg, then it hangs. The problem is that the agent incorrectly thinks that we are in the middle of a txg, and waits for a "resume done" message before allowing "read block" requests to be processed. The solution is for the kernel to tell the agent whether it's in the middle of a txg as part of the "open pool" message. Note that the old "resume" argument (indicating if we are opening the pool for the first time vs after agent restart) is not needed; these are handled identically if we are not in the middle of a txg. Since the resume_txg state is now communcated as part of the "open pool" request, the "resume txg" request is no longer needed. --- cmd/zfs_object_agent/zettaobject/src/pool.rs | 34 ++++++------------- .../zettaobject/src/root_connection.rs | 14 ++------ include/sys/vdev_object_store.h | 3 +- module/os/linux/zfs/vdev_object_store.c | 32 ++++++----------- 4 files changed, 25 insertions(+), 58 deletions(-) diff --git a/cmd/zfs_object_agent/zettaobject/src/pool.rs b/cmd/zfs_object_agent/zettaobject/src/pool.rs index ee6724263698..1fcb2ff1243e 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool.rs @@ -801,12 +801,16 @@ impl Pool { cache: Option, heartbeat_guard: Option, readonly: bool, - resuming: bool, + syncing_txg: Option, ) -> Result<(Pool, UberblockPhys, BlockId), PoolOpenError> { let phys = UberblockPhys::get(&object_access, pool_phys.guid, txg).await?; features::check_features(phys.features.iter().map(|(f, _)| f), readonly)?; + if let Some(resume_txg) = syncing_txg { + assert_gt!(resume_txg, phys.txg); + } + let shared_state = Arc::new(PoolSharedState { object_access: object_access.clone(), guid: pool_phys.guid, @@ -835,13 +839,14 @@ impl Pool { ), }); } - let (tx, rx) = watch::channel(resuming); + + let (tx, rx) = watch::channel(syncing_txg.is_some()); let pool = Pool { state: Arc::new(PoolState { shared_state: shared_state.clone(), syncing_state: std::sync::Mutex::new(Some(PoolSyncingState { last_txg: phys.txg, - syncing_txg: None, + syncing_txg, storage_object_log, reclaim_info: ReclaimInfo { indirect_table: ReclaimIndirectTable { @@ -880,8 +885,6 @@ impl Pool { let next_block = syncing_state.next_block(); - //println!("opened {:#?}", pool); - *pool.state.syncing_state.lock().unwrap() = Some(syncing_state); Ok((pool, phys, next_block)) } @@ -892,7 +895,7 @@ impl Pool { txg: Option, cache: Option, id: Uuid, - resuming: bool, + syncing_txg: Option, ) -> Result<(Pool, Option, BlockId), PoolOpenError> { let phys = PoolPhys::get(&object_access, guid).await?; if phys.last_txg.0 == 0 { @@ -984,7 +987,7 @@ impl Pool { None }, object_access.readonly(), - resuming, + syncing_txg, ) .await?; @@ -1019,7 +1022,7 @@ impl Pool { join3( pool.state.clone().cleanup_log_objects(), pool.state.clone().cleanup_uberblock_objects(last_txg), - if resuming { + if syncing_txg.is_some() { Either::Left(future::ready(())) } else { Either::Right(state.cleanup_data_objects()) @@ -1031,21 +1034,6 @@ impl Pool { } } - pub fn resume_txg(&self, txg: Txg) { - // The syncing_state is only held while a txg is open (begun). It's not - // allowed to call begin_txg() while a txg is already open, so the lock - // must not be held. - // XXX change this to return an error to the client - self.state.with_syncing_state(|syncing_state| { - assert!(syncing_state.syncing_txg.is_none()); - assert_gt!(txg, syncing_state.last_txg); - syncing_state.syncing_txg = Some(txg); - - // Resuming state is indicated by pending_object = NotPending - assert!(!syncing_state.pending_object.is_pending()); - }) - } - async fn get_recovered_objects( state: &Arc, shared_state: &Arc, diff --git a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs index 8f2af5580e19..db3f37b15a77 100644 --- a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs @@ -69,7 +69,6 @@ impl RootConnectionState { server.register_serial_handler("open pool", Box::new(Self::open_pool)); server.register_serial_handler("resume complete", Box::new(Self::resume_complete)); server.register_handler("begin txg", Box::new(Self::begin_txg)); - server.register_handler("resume txg", Box::new(Self::resume_txg)); server.register_handler("flush writes", Box::new(Self::flush_writes)); server.register_handler("end txg", Box::new(Self::end_txg)); server.register_handler("write block", Box::new(Self::write_block)); @@ -121,14 +120,14 @@ impl RootConnectionState { info!("got request: {:?}", nvl); Box::pin(async move { let guid = PoolGuid(nvl.lookup_uint64("GUID")?); - let resume = bool_value(&nvl, "resume")?; let object_access = Self::get_object_access(&nvl)?; let cache = self.cache.as_ref().cloned(); let txg = nvl.lookup_uint64("TXG").ok().map(Txg); + let syncing_txg = nvl.lookup_uint64("syncing_txg").ok().map(Txg); let (pool, phys_opt, next_block) = - match Pool::open(object_access, guid, txg, cache, self.id, resume).await { + match Pool::open(object_access, guid, txg, cache, self.id, syncing_txg).await { Err(PoolOpenError::Mmp(hostname)) => { let mut response = NvList::new_unique_names(); response.insert("Type", "pool open failed").unwrap(); @@ -212,15 +211,6 @@ impl RootConnectionState { handler_return_ok(None) } - fn resume_txg(&mut self, nvl: NvList) -> HandlerReturn { - info!("got request: {:?}", nvl); - let txg = Txg(nvl.lookup_uint64("TXG")?); - let pool = self.pool.as_ref().ok_or_else(|| anyhow!("no pool open"))?; - pool.resume_txg(txg); - - handler_return_ok(None) - } - fn resume_complete(&mut self, nvl: NvList) -> SerialHandlerReturn { info!("got request: {:?}", nvl); diff --git a/include/sys/vdev_object_store.h b/include/sys/vdev_object_store.h index 4695afdf312d..0eea5ee2d4eb 100644 --- a/include/sys/vdev_object_store.h +++ b/include/sys/vdev_object_store.h @@ -33,7 +33,6 @@ #define AGENT_TYPE_WRITE_DONE "write done" #define AGENT_TYPE_FREE_BLOCK "free block" #define AGENT_TYPE_BEGIN_TXG "begin txg" -#define AGENT_TYPE_RESUME_TXG "resume txg" #define AGENT_TYPE_RESUME_COMPLETE "resume complete" #define AGENT_TYPE_END_TXG "end txg" #define AGENT_TYPE_END_TXG_DONE "end txg done" @@ -68,7 +67,7 @@ #define AGENT_CAUSE "cause" #define AGENT_HOSTNAME "hostname" #define AGENT_READONLY "readonly" -#define AGENT_RESUME "resume" +#define AGENT_SYNCING_TXG "syncing_txg" #define AGENT_HEAL "heal" #define AGENT_FEATURE "feature" #define AGENT_FEATURES "features" diff --git a/module/os/linux/zfs/vdev_object_store.c b/module/os/linux/zfs/vdev_object_store.c index 0a9ff146953b..197871968eb5 100644 --- a/module/os/linux/zfs/vdev_object_store.c +++ b/module/os/linux/zfs/vdev_object_store.c @@ -548,13 +548,23 @@ agent_open_pool(vdev_t *vd, vdev_object_store_t *vos, mode_t mode, fnvlist_add_string(nv, AGENT_ENDPOINT, vos->vos_endpoint); fnvlist_add_string(nv, AGENT_REGION, vos->vos_region); fnvlist_add_string(nv, AGENT_BUCKET, vd->vdev_path); - fnvlist_add_boolean_value(nv, AGENT_RESUME, resume); if (mode == O_RDONLY) fnvlist_add_boolean(nv, AGENT_READONLY); if (vd->vdev_spa->spa_load_max_txg != UINT64_MAX) { fnvlist_add_uint64(nv, AGENT_TXG, vd->vdev_spa->spa_load_max_txg); } + + /* + * When we're resuming from an agent restart and we're + * in the middle of a txg, then we need to let the agent + * know the txg value. + */ + if (resume && vos->vos_send_txg_selector <= VOS_TXG_END) { + fnvlist_add_uint64(nv, AGENT_SYNCING_TXG, + spa_syncing_txg(vd->vdev_spa)); + } + zfs_dbgmsg("agent_open_pool(guid=%llu bucket=%s)", (u_longlong_t)spa_guid(vd->vdev_spa), vd->vdev_path); @@ -582,22 +592,6 @@ agent_begin_txg(vdev_object_store_t *vos, uint64_t txg) fnvlist_free(nv); } -static void -agent_resume_txg(vdev_object_store_t *vos, uint64_t txg) -{ - ASSERT(MUTEX_HELD(&vos->vos_sock_lock)); - zfs_object_store_wait(vos, VOS_SOCK_OPEN); - - nvlist_t *nv = fnvlist_alloc(); - fnvlist_add_string(nv, AGENT_TYPE, AGENT_TYPE_RESUME_TXG); - fnvlist_add_uint64(nv, AGENT_TXG, txg); - - zfs_dbgmsg("agent_resume_txg(%llu)", - (u_longlong_t)txg); - agent_request(vos, nv, FTAG); - fnvlist_free(nv); -} - static void agent_resume_complete(vdev_object_store_t *vos) { @@ -752,10 +746,6 @@ agent_resume(void *arg) mutex_enter(&vos->vos_sock_lock); - if (vos->vos_send_txg_selector <= VOS_TXG_END) { - agent_resume_txg(vos, spa_syncing_txg(spa)); - } - vdev_queue_t *vq = &vd->vdev_queue; mutex_enter(&vq->vq_lock);