From 27262128b910a1b41f2d118009ce2e5fb25ee216 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Mon, 16 Aug 2021 12:36:31 -0700 Subject: [PATCH] DOSE-464 when the kernel crashes, the agent needs to remove "leaked" objects (#399) When the kernel crashes in the middle of a txg, there may be some new objects that were created, but which are not part of the consistent state yet (i.e. the last uberblock does not reference them). When opening a pool, we need to look for and remove these "leaked" objects. The leaking is actually not a big deal, because it's not a lot of space and we're going to overwrite (most of) these objects soon anyway. However, if these "leaked" objects are present during agent recovery (i.e. agent dies and comes back during a txg), it can confuse the recovery process because we can't tell which "extra" objects are "leaked", useless objects, written by the crashed kernel, vs which are "recovered", useful objects that were written by this instance of the kernel. This leads to incorrect recovery. The solution is to proactively clean up the "leaked" objects when opening the pool. This includes objects that contain DataObjectPhys and ObjectBasedLogChunk. --- .../zettaobject/src/kernel_connection.rs | 9 +- .../zettaobject/src/object_based_log.rs | 65 +++++---- cmd/zfs_object_agent/zettaobject/src/pool.rs | 126 +++++++++++++++--- module/os/linux/zfs/vdev_object_store.c | 9 +- 4 files changed, 162 insertions(+), 47 deletions(-) diff --git a/cmd/zfs_object_agent/zettaobject/src/kernel_connection.rs b/cmd/zfs_object_agent/zettaobject/src/kernel_connection.rs index 86afff4afcb6..5f827f0e449d 100644 --- a/cmd/zfs_object_agent/zettaobject/src/kernel_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/kernel_connection.rs @@ -106,12 +106,19 @@ impl KernelConnectionState { info!("got request: {:?}", nvl); Box::pin(async move { let guid = PoolGuid(nvl.lookup_uint64("GUID")?); + let resume_data = nvl.as_ref().lookup("resume").unwrap().data(); + let resume = if let NvData::BoolV(resume) = resume_data { + resume + } else { + return Err(anyhow!("data {:?} not expected type", resume_data)); + }; + let object_access = Self::get_object_access(nvl.as_ref())?; let cache = self.cache.as_ref().cloned(); let txg = nvl.lookup_uint64("TXG").ok().map(Txg); let (pool, phys_opt, next_block) = - match Pool::open(&object_access, guid, txg, cache, self.id).await { + match Pool::open(&object_access, guid, txg, cache, self.id, resume).await { Err(PoolOpenError::MmpError(hostname)) => { let mut response = NvList::new_unique_names(); response.insert("Type", "pool open failed").unwrap(); diff --git a/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs b/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs index 9780d078a88b..23b70d58373b 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs @@ -3,8 +3,8 @@ use crate::object_access::ObjectAccess; use crate::pool::PoolSharedState; use anyhow::{Context, Result}; use async_stream::stream; -use futures::future; use futures::future::join_all; +use futures::future::{self, join}; use futures::stream::{FuturesOrdered, StreamExt}; use futures_core::Stream; use lazy_static::lazy_static; @@ -146,34 +146,49 @@ impl ObjectBasedLog { /// Recover after a system crash, where the kernel also crashed and we are discarding /// any changes after the current txg. - pub async fn recover(&mut self) { - // XXX now that we are flushing async, there could be gaps in written - // but not needed chunkID's. Probably want to change keys to use padded numbers so that - // we can easily find any after the last chunk. - - // Delete any chunks past the logical end of the log - /* - for c in self.num_chunks.. { - let key = &format!("{}/{:020}/{:020}", self.name, self.generation, c); - if self.pool.object_access.object_exists(&key).await { - self.pool.object_access.delete_object(&key).await; - } else { - break; + pub async fn cleanup(&mut self) { + // collect chunks past the end, in the current generation + let shared_state = self.shared_state.clone(); + let last_generation_key = format!("{}/{:020}/", self.name, self.generation); + let start_after = if self.num_chunks == 0 { + None + } else { + Some(ObjectBasedLogChunk::::key( + &self.name, + self.generation, + self.num_chunks - 1, + )) + }; + let current_generation_cleanup = async move { + let objects = shared_state + .object_access + .collect_objects(&last_generation_key, start_after) + .await; + for chunk in objects.chunks(900) { + info!( + "cleanup: deleting future chunks of current generation: {:?}", + chunk + ); + shared_state.object_access.delete_objects(chunk).await; } - } + }; - // Delete the partially-complete generation (if present) - for c in 0.. { - let key = &format!("{}/{:020}/{:020}", self.name, self.generation + 1, c); - if self.pool.object_access.object_exists(key).await { - self.pool.object_access.delete_object(key).await; - } else { - break; + // collect chunks from the partially-complete future generation + let shared_state = self.shared_state.clone(); + let next_generation_key = format!("{}/{:020}/", self.name, self.generation + 1); + let next_generation_cleanup = async move { + let objects = shared_state + .object_access + .collect_objects(&next_generation_key, None) + .await; + for chunk in objects.chunks(900) { + info!("cleanup: deleting chunks of future generation: {:?}", chunk); + shared_state.object_access.delete_objects(chunk).await; } - } - */ + }; - // XXX verify that there are no chunks/generations past what we deleted + // execute both cleanup's concurrently + join(current_generation_cleanup, next_generation_cleanup).await; self.recovered = true; } diff --git a/cmd/zfs_object_agent/zettaobject/src/pool.rs b/cmd/zfs_object_agent/zettaobject/src/pool.rs index e1d29aa002f1..d3394d703a8b 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool.rs @@ -11,8 +11,10 @@ use crate::object_block_map::ObjectBlockMap; use crate::object_block_map::StorageObjectLogEntry; use anyhow::Error; use anyhow::{Context, Result}; -use core::future::Future; use futures::future; +use futures::future::join3; +use futures::future::Either; +use futures::future::Future; use futures::stream::*; use lazy_static::lazy_static; use log::*; @@ -514,6 +516,80 @@ impl PoolState { let mut guard = self.syncing_state.lock().unwrap(); f(guard.as_mut().unwrap()) } + + async fn cleanup_uberblock_objects(&self, last_txg: Txg) { + let shared_state = &self.shared_state; + let txg_key = format!("zfs/{}/txg/", shared_state.guid); + let start_after = Some(UberblockPhys::key(shared_state.guid, last_txg)); + let objects = shared_state + .object_access + .collect_objects(&txg_key, start_after) + .await; + for chunk in objects.chunks(900) { + info!("cleanup: deleting future uberblocks: {:?}", chunk); + shared_state.object_access.delete_objects(chunk).await; + } + } + + /// Remove any log objects that are invalid (i.e. written as part of an + /// in-progress txg before the kernel or agent crashed) + async fn cleanup_log_objects(&self) { + let mut syncing_state = self.syncing_state.lock().unwrap().take().unwrap(); + + let begin = Instant::now(); + let stream = FuturesUnordered::new(); + for log in syncing_state.pending_frees_log.iter_mut() { + stream.push(log.cleanup()); + } + join3( + syncing_state.storage_object_log.cleanup(), + syncing_state.object_size_log.cleanup(), + stream.for_each(|_| future::ready(())), + ) + .await; + assert!(self.syncing_state.lock().unwrap().is_none()); + *self.syncing_state.lock().unwrap() = Some(syncing_state); + + info!( + "cleanup: found and deleted log objects in {}ms", + begin.elapsed().as_millis() + ); + } + + /// Remove any data objects that are invalid (i.e. written as part of an + /// in-progress txg before the kernel crashed) + async fn cleanup_data_objects(&self) { + let shared_state = self.shared_state.clone(); + + let begin = Instant::now(); + let last_obj = self.object_block_map.last_object(); + let list_stream = FuturesUnordered::new(); + for prefix in DataObjectPhys::prefixes(shared_state.guid) { + let shared_state = shared_state.clone(); + list_stream.push(async move { + shared_state + .object_access + .collect_objects(&prefix, Some(format!("{}{}", prefix, last_obj))) + .await + }); + } + + let objects = list_stream + .fold(Vec::new(), |mut vec, mut x| async move { + vec.append(&mut x); + vec + }) + .await; + for chunk in objects.chunks(900) { + shared_state.object_access.delete_objects(chunk).await; + } + + info!( + "cleanup: found and deleted {} data objects in {}ms", + objects.len(), + begin.elapsed().as_millis() + ); + } } impl Pool { @@ -588,9 +664,9 @@ impl Pool { pending_unordered_writes: HashMap::new(), stats: phys.stats, reclaim_done: None, - rewriting_objects: HashMap::new(), - objects_to_delete: Vec::new(), - pending_flushes: BTreeSet::new(), + rewriting_objects: Default::default(), + objects_to_delete: Default::default(), + pending_flushes: Default::default(), })), zettacache: cache, object_block_map, @@ -598,18 +674,11 @@ impl Pool { }), }; - let mut syncing_state = { + let syncing_state = { let mut guard = pool.state.syncing_state.lock().unwrap(); guard.take().unwrap() }; - syncing_state.storage_object_log.recover().await; - syncing_state.object_size_log.recover().await; - for log in syncing_state.pending_frees_log.iter_mut() { - // XXX can these be done in parallel? - log.recover().await; - } - assert_eq!( pool.state.object_block_map.len() as u64, syncing_state.stats.objects_count @@ -629,6 +698,7 @@ impl Pool { txg: Option, cache: Option, id: Uuid, + resume: bool, ) -> Result<(Pool, Option, BlockId), PoolOpenError> { let phys = PoolPhys::get(object_access, guid).await?; if phys.last_txg.0 == 0 { @@ -701,7 +771,28 @@ impl Pool { ) .await; - pool.claim(id).await.map(|_| (pool, ub, next_block)) + pool.claim(id).await?; + + if !object_access.readonly() { + let last_txg = pool + .state + .with_syncing_state(|syncing_state| syncing_state.last_txg); + let state = pool.state.clone(); + // Note: cleanup_log_objects() takes the syncing_state, so the + // other concurrently-executed cleanups can not access the + // syncing state. That's why we need to pass in the last_txg. + join3( + pool.state.clone().cleanup_log_objects(), + pool.state.clone().cleanup_uberblock_objects(last_txg), + if resume { + Either::Left(future::ready(())) + } else { + Either::Right(state.cleanup_data_objects()) + }, + ) + .await; + } + Ok((pool, ub, next_block)) } } @@ -875,12 +966,14 @@ impl Pool { txg, ); - debug!("resume: moving last writes to pending_object"); + debug!("resume: moving last writes to pending_object and flushing"); Self::write_unordered_to_pending_object( state, syncing_state, Some(*MAX_BYTES_PER_OBJECT), ); + Self::initiate_flush_object_impl(state, syncing_state); + info!("resume: completed"); }) } @@ -905,10 +998,7 @@ impl Pool { pub async fn end_txg(&self, uberblock: Vec, config: Vec) -> PoolStatsPhys { let state = &self.state; - let mut syncing_state = { - let mut guard = state.syncing_state.lock().unwrap(); - guard.take().unwrap() - }; + let mut syncing_state = state.syncing_state.lock().unwrap().take().unwrap(); // should have already been flushed; no pending writes assert!(syncing_state.pending_unordered_writes.is_empty()); diff --git a/module/os/linux/zfs/vdev_object_store.c b/module/os/linux/zfs/vdev_object_store.c index 3e5aa97fc8ca..cb7d794f30c3 100644 --- a/module/os/linux/zfs/vdev_object_store.c +++ b/module/os/linux/zfs/vdev_object_store.c @@ -67,6 +67,7 @@ #define AGENT_CAUSE "cause" #define AGENT_HOSTNAME "hostname" #define AGENT_READONLY "readonly" +#define AGENT_RESUME "resume" /* * By default, the logical/physical ashift for object store vdevs is set to @@ -503,7 +504,8 @@ agent_create_pool(vdev_t *vd, vdev_object_store_t *vos) } static uint64_t -agent_open_pool(vdev_t *vd, vdev_object_store_t *vos, mode_t mode) +agent_open_pool(vdev_t *vd, vdev_object_store_t *vos, mode_t mode, + boolean_t resume) { /* * We need to ensure that we only issue a request when the @@ -523,6 +525,7 @@ agent_open_pool(vdev_t *vd, vdev_object_store_t *vos, mode_t mode) fnvlist_add_string(nv, AGENT_ENDPOINT, vos->vos_endpoint); fnvlist_add_string(nv, AGENT_REGION, vos->vos_region); fnvlist_add_string(nv, AGENT_BUCKET, vd->vdev_path); + fnvlist_add_boolean_value(nv, AGENT_RESUME, resume); if (mode == O_RDONLY) fnvlist_add_boolean(nv, AGENT_READONLY); if (vd->vdev_spa->spa_load_max_txg != UINT64_MAX) { @@ -674,7 +677,7 @@ agent_resume(void *arg) agent_create_pool(vd, vos); } VERIFY0(agent_open_pool(vd, vos, - vdev_object_store_open_mode(spa_mode(vd->vdev_spa)))); + vdev_object_store_open_mode(spa_mode(vd->vdev_spa)), B_TRUE)); if ((ret = agent_resume_state_check(vd)) != 0) { zfs_dbgmsg("agent resume failed, uberblock changed"); @@ -1262,7 +1265,7 @@ vdev_object_store_open(vdev_t *vd, uint64_t *psize, uint64_t *max_psize, agent_create_pool(vd, vos); } error = agent_open_pool(vd, vos, - vdev_object_store_open_mode(spa_mode(vd->vdev_spa))); + vdev_object_store_open_mode(spa_mode(vd->vdev_spa)), B_FALSE); if (error != 0) { ASSERT3U(vd->vdev_spa->spa_load_state, !=, SPA_LOAD_CREATE); return (error);