Skip to content

Commit

Permalink
DOSE-725 zio read never returns when agent is restarted (openzfs#508)
Browse files Browse the repository at this point in the history
If the agent restarts when there are outstanding "read block" requests,
but we are not in the middle of a txg, then it hangs.  The problem is
that the agent incorrectly thinks that we are in the middle of a txg,
and waits for a "resume done" message before allowing "read block"
requests to be processed.

The solution is for the kernel to tell the agent whether it's in the
middle of a txg as part of the "open pool" message.  Note that the old
"resume" argument (indicating if we are opening the pool for the first
time vs after agent restart) is not needed; these are handled
identically if we are not in the middle of a txg.

Since the resume_txg state is now communcated as part of the "open pool"
request, the "resume txg" request is no longer needed.
  • Loading branch information
ahrens authored Oct 14, 2021
1 parent 3257254 commit 51cd367
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 58 deletions.
34 changes: 11 additions & 23 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,12 +801,16 @@ impl Pool {
cache: Option<ZettaCache>,
heartbeat_guard: Option<HeartbeatGuard>,
readonly: bool,
resuming: bool,
syncing_txg: Option<Txg>,
) -> Result<(Pool, UberblockPhys, BlockId), PoolOpenError> {
let phys = UberblockPhys::get(&object_access, pool_phys.guid, txg).await?;

features::check_features(phys.features.iter().map(|(f, _)| f), readonly)?;

if let Some(resume_txg) = syncing_txg {
assert_gt!(resume_txg, phys.txg);
}

let shared_state = Arc::new(PoolSharedState {
object_access: object_access.clone(),
guid: pool_phys.guid,
Expand Down Expand Up @@ -835,13 +839,14 @@ impl Pool {
),
});
}
let (tx, rx) = watch::channel(resuming);

let (tx, rx) = watch::channel(syncing_txg.is_some());
let pool = Pool {
state: Arc::new(PoolState {
shared_state: shared_state.clone(),
syncing_state: std::sync::Mutex::new(Some(PoolSyncingState {
last_txg: phys.txg,
syncing_txg: None,
syncing_txg,
storage_object_log,
reclaim_info: ReclaimInfo {
indirect_table: ReclaimIndirectTable {
Expand Down Expand Up @@ -880,8 +885,6 @@ impl Pool {

let next_block = syncing_state.next_block();

//println!("opened {:#?}", pool);

*pool.state.syncing_state.lock().unwrap() = Some(syncing_state);
Ok((pool, phys, next_block))
}
Expand All @@ -892,7 +895,7 @@ impl Pool {
txg: Option<Txg>,
cache: Option<ZettaCache>,
id: Uuid,
resuming: bool,
syncing_txg: Option<Txg>,
) -> Result<(Pool, Option<UberblockPhys>, BlockId), PoolOpenError> {
let phys = PoolPhys::get(&object_access, guid).await?;
if phys.last_txg.0 == 0 {
Expand Down Expand Up @@ -984,7 +987,7 @@ impl Pool {
None
},
object_access.readonly(),
resuming,
syncing_txg,
)
.await?;

Expand Down Expand Up @@ -1019,7 +1022,7 @@ impl Pool {
join3(
pool.state.clone().cleanup_log_objects(),
pool.state.clone().cleanup_uberblock_objects(last_txg),
if resuming {
if syncing_txg.is_some() {
Either::Left(future::ready(()))
} else {
Either::Right(state.cleanup_data_objects())
Expand All @@ -1031,21 +1034,6 @@ impl Pool {
}
}

pub fn resume_txg(&self, txg: Txg) {
// The syncing_state is only held while a txg is open (begun). It's not
// allowed to call begin_txg() while a txg is already open, so the lock
// must not be held.
// XXX change this to return an error to the client
self.state.with_syncing_state(|syncing_state| {
assert!(syncing_state.syncing_txg.is_none());
assert_gt!(txg, syncing_state.last_txg);
syncing_state.syncing_txg = Some(txg);

// Resuming state is indicated by pending_object = NotPending
assert!(!syncing_state.pending_object.is_pending());
})
}

async fn get_recovered_objects(
state: &Arc<PoolState>,
shared_state: &Arc<PoolSharedState>,
Expand Down
14 changes: 2 additions & 12 deletions cmd/zfs_object_agent/zettaobject/src/root_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ impl RootConnectionState {
server.register_serial_handler("open pool", Box::new(Self::open_pool));
server.register_serial_handler("resume complete", Box::new(Self::resume_complete));
server.register_handler("begin txg", Box::new(Self::begin_txg));
server.register_handler("resume txg", Box::new(Self::resume_txg));
server.register_handler("flush writes", Box::new(Self::flush_writes));
server.register_handler("end txg", Box::new(Self::end_txg));
server.register_handler("write block", Box::new(Self::write_block));
Expand Down Expand Up @@ -121,14 +120,14 @@ impl RootConnectionState {
info!("got request: {:?}", nvl);
Box::pin(async move {
let guid = PoolGuid(nvl.lookup_uint64("GUID")?);
let resume = bool_value(&nvl, "resume")?;

let object_access = Self::get_object_access(&nvl)?;
let cache = self.cache.as_ref().cloned();
let txg = nvl.lookup_uint64("TXG").ok().map(Txg);
let syncing_txg = nvl.lookup_uint64("syncing_txg").ok().map(Txg);

let (pool, phys_opt, next_block) =
match Pool::open(object_access, guid, txg, cache, self.id, resume).await {
match Pool::open(object_access, guid, txg, cache, self.id, syncing_txg).await {
Err(PoolOpenError::Mmp(hostname)) => {
let mut response = NvList::new_unique_names();
response.insert("Type", "pool open failed").unwrap();
Expand Down Expand Up @@ -212,15 +211,6 @@ impl RootConnectionState {
handler_return_ok(None)
}

fn resume_txg(&mut self, nvl: NvList) -> HandlerReturn {
info!("got request: {:?}", nvl);
let txg = Txg(nvl.lookup_uint64("TXG")?);
let pool = self.pool.as_ref().ok_or_else(|| anyhow!("no pool open"))?;
pool.resume_txg(txg);

handler_return_ok(None)
}

fn resume_complete(&mut self, nvl: NvList) -> SerialHandlerReturn {
info!("got request: {:?}", nvl);

Expand Down
3 changes: 1 addition & 2 deletions include/sys/vdev_object_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#define AGENT_TYPE_WRITE_DONE "write done"
#define AGENT_TYPE_FREE_BLOCK "free block"
#define AGENT_TYPE_BEGIN_TXG "begin txg"
#define AGENT_TYPE_RESUME_TXG "resume txg"
#define AGENT_TYPE_RESUME_COMPLETE "resume complete"
#define AGENT_TYPE_END_TXG "end txg"
#define AGENT_TYPE_END_TXG_DONE "end txg done"
Expand Down Expand Up @@ -68,7 +67,7 @@
#define AGENT_CAUSE "cause"
#define AGENT_HOSTNAME "hostname"
#define AGENT_READONLY "readonly"
#define AGENT_RESUME "resume"
#define AGENT_SYNCING_TXG "syncing_txg"
#define AGENT_HEAL "heal"
#define AGENT_FEATURE "feature"
#define AGENT_FEATURES "features"
Expand Down
32 changes: 11 additions & 21 deletions module/os/linux/zfs/vdev_object_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,23 @@ agent_open_pool(vdev_t *vd, vdev_object_store_t *vos, mode_t mode,
fnvlist_add_string(nv, AGENT_ENDPOINT, vos->vos_endpoint);
fnvlist_add_string(nv, AGENT_REGION, vos->vos_region);
fnvlist_add_string(nv, AGENT_BUCKET, vd->vdev_path);
fnvlist_add_boolean_value(nv, AGENT_RESUME, resume);
if (mode == O_RDONLY)
fnvlist_add_boolean(nv, AGENT_READONLY);
if (vd->vdev_spa->spa_load_max_txg != UINT64_MAX) {
fnvlist_add_uint64(nv, AGENT_TXG,
vd->vdev_spa->spa_load_max_txg);
}

/*
* When we're resuming from an agent restart and we're
* in the middle of a txg, then we need to let the agent
* know the txg value.
*/
if (resume && vos->vos_send_txg_selector <= VOS_TXG_END) {
fnvlist_add_uint64(nv, AGENT_SYNCING_TXG,
spa_syncing_txg(vd->vdev_spa));
}

zfs_dbgmsg("agent_open_pool(guid=%llu bucket=%s)",
(u_longlong_t)spa_guid(vd->vdev_spa),
vd->vdev_path);
Expand Down Expand Up @@ -582,22 +592,6 @@ agent_begin_txg(vdev_object_store_t *vos, uint64_t txg)
fnvlist_free(nv);
}

static void
agent_resume_txg(vdev_object_store_t *vos, uint64_t txg)
{
ASSERT(MUTEX_HELD(&vos->vos_sock_lock));
zfs_object_store_wait(vos, VOS_SOCK_OPEN);

nvlist_t *nv = fnvlist_alloc();
fnvlist_add_string(nv, AGENT_TYPE, AGENT_TYPE_RESUME_TXG);
fnvlist_add_uint64(nv, AGENT_TXG, txg);

zfs_dbgmsg("agent_resume_txg(%llu)",
(u_longlong_t)txg);
agent_request(vos, nv, FTAG);
fnvlist_free(nv);
}

static void
agent_resume_complete(vdev_object_store_t *vos)
{
Expand Down Expand Up @@ -752,10 +746,6 @@ agent_resume(void *arg)

mutex_enter(&vos->vos_sock_lock);

if (vos->vos_send_txg_selector <= VOS_TXG_END) {
agent_resume_txg(vos, spa_syncing_txg(spa));
}

vdev_queue_t *vq = &vd->vdev_queue;

mutex_enter(&vq->vq_lock);
Expand Down

0 comments on commit 51cd367

Please sign in to comment.