Skip to content

Commit

Permalink
DOSE-464 when the kernel crashes, the agent needs to remove "leaked" …
Browse files Browse the repository at this point in the history
…objects (openzfs#399)

When the kernel crashes in the middle of a txg, there may be some new objects that were created, but which are not part of the consistent state yet (i.e. the last uberblock does not reference them). When opening a pool, we need to look for and remove these "leaked" objects. The leaking is actually not a big deal, because it's not a lot of space and we're going to overwrite (most of) these objects soon anyway. However, if these "leaked" objects are present during agent recovery (i.e. agent dies and comes back during a txg), it can confuse the recovery process because we can't tell which "extra" objects are "leaked", useless objects, written by the crashed kernel, vs which are "recovered", useful objects that were written by this instance of the kernel. This leads to incorrect recovery.

The solution is to proactively clean up the "leaked" objects when opening the pool. This includes objects that contain DataObjectPhys and ObjectBasedLogChunk.
  • Loading branch information
ahrens authored Aug 16, 2021
1 parent 1a0c7e5 commit 2726212
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 47 deletions.
9 changes: 8 additions & 1 deletion cmd/zfs_object_agent/zettaobject/src/kernel_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,19 @@ impl KernelConnectionState {
info!("got request: {:?}", nvl);
Box::pin(async move {
let guid = PoolGuid(nvl.lookup_uint64("GUID")?);
let resume_data = nvl.as_ref().lookup("resume").unwrap().data();
let resume = if let NvData::BoolV(resume) = resume_data {
resume
} else {
return Err(anyhow!("data {:?} not expected type", resume_data));
};

let object_access = Self::get_object_access(nvl.as_ref())?;
let cache = self.cache.as_ref().cloned();
let txg = nvl.lookup_uint64("TXG").ok().map(Txg);

let (pool, phys_opt, next_block) =
match Pool::open(&object_access, guid, txg, cache, self.id).await {
match Pool::open(&object_access, guid, txg, cache, self.id, resume).await {
Err(PoolOpenError::MmpError(hostname)) => {
let mut response = NvList::new_unique_names();
response.insert("Type", "pool open failed").unwrap();
Expand Down
65 changes: 40 additions & 25 deletions cmd/zfs_object_agent/zettaobject/src/object_based_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::object_access::ObjectAccess;
use crate::pool::PoolSharedState;
use anyhow::{Context, Result};
use async_stream::stream;
use futures::future;
use futures::future::join_all;
use futures::future::{self, join};
use futures::stream::{FuturesOrdered, StreamExt};
use futures_core::Stream;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -146,34 +146,49 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {

/// Recover after a system crash, where the kernel also crashed and we are discarding
/// any changes after the current txg.
pub async fn recover(&mut self) {
// XXX now that we are flushing async, there could be gaps in written
// but not needed chunkID's. Probably want to change keys to use padded numbers so that
// we can easily find any after the last chunk.

// Delete any chunks past the logical end of the log
/*
for c in self.num_chunks.. {
let key = &format!("{}/{:020}/{:020}", self.name, self.generation, c);
if self.pool.object_access.object_exists(&key).await {
self.pool.object_access.delete_object(&key).await;
} else {
break;
pub async fn cleanup(&mut self) {
// collect chunks past the end, in the current generation
let shared_state = self.shared_state.clone();
let last_generation_key = format!("{}/{:020}/", self.name, self.generation);
let start_after = if self.num_chunks == 0 {
None
} else {
Some(ObjectBasedLogChunk::<T>::key(
&self.name,
self.generation,
self.num_chunks - 1,
))
};
let current_generation_cleanup = async move {
let objects = shared_state
.object_access
.collect_objects(&last_generation_key, start_after)
.await;
for chunk in objects.chunks(900) {
info!(
"cleanup: deleting future chunks of current generation: {:?}",
chunk
);
shared_state.object_access.delete_objects(chunk).await;
}
}
};

// Delete the partially-complete generation (if present)
for c in 0.. {
let key = &format!("{}/{:020}/{:020}", self.name, self.generation + 1, c);
if self.pool.object_access.object_exists(key).await {
self.pool.object_access.delete_object(key).await;
} else {
break;
// collect chunks from the partially-complete future generation
let shared_state = self.shared_state.clone();
let next_generation_key = format!("{}/{:020}/", self.name, self.generation + 1);
let next_generation_cleanup = async move {
let objects = shared_state
.object_access
.collect_objects(&next_generation_key, None)
.await;
for chunk in objects.chunks(900) {
info!("cleanup: deleting chunks of future generation: {:?}", chunk);
shared_state.object_access.delete_objects(chunk).await;
}
}
*/
};

// XXX verify that there are no chunks/generations past what we deleted
// execute both cleanup's concurrently
join(current_generation_cleanup, next_generation_cleanup).await;

self.recovered = true;
}
Expand Down
126 changes: 108 additions & 18 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use crate::object_block_map::ObjectBlockMap;
use crate::object_block_map::StorageObjectLogEntry;
use anyhow::Error;
use anyhow::{Context, Result};
use core::future::Future;
use futures::future;
use futures::future::join3;
use futures::future::Either;
use futures::future::Future;
use futures::stream::*;
use lazy_static::lazy_static;
use log::*;
Expand Down Expand Up @@ -514,6 +516,80 @@ impl PoolState {
let mut guard = self.syncing_state.lock().unwrap();
f(guard.as_mut().unwrap())
}

async fn cleanup_uberblock_objects(&self, last_txg: Txg) {
let shared_state = &self.shared_state;
let txg_key = format!("zfs/{}/txg/", shared_state.guid);
let start_after = Some(UberblockPhys::key(shared_state.guid, last_txg));
let objects = shared_state
.object_access
.collect_objects(&txg_key, start_after)
.await;
for chunk in objects.chunks(900) {
info!("cleanup: deleting future uberblocks: {:?}", chunk);
shared_state.object_access.delete_objects(chunk).await;
}
}

/// Remove any log objects that are invalid (i.e. written as part of an
/// in-progress txg before the kernel or agent crashed)
async fn cleanup_log_objects(&self) {
let mut syncing_state = self.syncing_state.lock().unwrap().take().unwrap();

let begin = Instant::now();
let stream = FuturesUnordered::new();
for log in syncing_state.pending_frees_log.iter_mut() {
stream.push(log.cleanup());
}
join3(
syncing_state.storage_object_log.cleanup(),
syncing_state.object_size_log.cleanup(),
stream.for_each(|_| future::ready(())),
)
.await;
assert!(self.syncing_state.lock().unwrap().is_none());
*self.syncing_state.lock().unwrap() = Some(syncing_state);

info!(
"cleanup: found and deleted log objects in {}ms",
begin.elapsed().as_millis()
);
}

/// Remove any data objects that are invalid (i.e. written as part of an
/// in-progress txg before the kernel crashed)
async fn cleanup_data_objects(&self) {
let shared_state = self.shared_state.clone();

let begin = Instant::now();
let last_obj = self.object_block_map.last_object();
let list_stream = FuturesUnordered::new();
for prefix in DataObjectPhys::prefixes(shared_state.guid) {
let shared_state = shared_state.clone();
list_stream.push(async move {
shared_state
.object_access
.collect_objects(&prefix, Some(format!("{}{}", prefix, last_obj)))
.await
});
}

let objects = list_stream
.fold(Vec::new(), |mut vec, mut x| async move {
vec.append(&mut x);
vec
})
.await;
for chunk in objects.chunks(900) {
shared_state.object_access.delete_objects(chunk).await;
}

info!(
"cleanup: found and deleted {} data objects in {}ms",
objects.len(),
begin.elapsed().as_millis()
);
}
}

impl Pool {
Expand Down Expand Up @@ -588,28 +664,21 @@ impl Pool {
pending_unordered_writes: HashMap::new(),
stats: phys.stats,
reclaim_done: None,
rewriting_objects: HashMap::new(),
objects_to_delete: Vec::new(),
pending_flushes: BTreeSet::new(),
rewriting_objects: Default::default(),
objects_to_delete: Default::default(),
pending_flushes: Default::default(),
})),
zettacache: cache,
object_block_map,
_heartbeat_guard: heartbeat_guard,
}),
};

let mut syncing_state = {
let syncing_state = {
let mut guard = pool.state.syncing_state.lock().unwrap();
guard.take().unwrap()
};

syncing_state.storage_object_log.recover().await;
syncing_state.object_size_log.recover().await;
for log in syncing_state.pending_frees_log.iter_mut() {
// XXX can these be done in parallel?
log.recover().await;
}

assert_eq!(
pool.state.object_block_map.len() as u64,
syncing_state.stats.objects_count
Expand All @@ -629,6 +698,7 @@ impl Pool {
txg: Option<Txg>,
cache: Option<ZettaCache>,
id: Uuid,
resume: bool,
) -> Result<(Pool, Option<UberblockPhys>, BlockId), PoolOpenError> {
let phys = PoolPhys::get(object_access, guid).await?;
if phys.last_txg.0 == 0 {
Expand Down Expand Up @@ -701,7 +771,28 @@ impl Pool {
)
.await;

pool.claim(id).await.map(|_| (pool, ub, next_block))
pool.claim(id).await?;

if !object_access.readonly() {
let last_txg = pool
.state
.with_syncing_state(|syncing_state| syncing_state.last_txg);
let state = pool.state.clone();
// Note: cleanup_log_objects() takes the syncing_state, so the
// other concurrently-executed cleanups can not access the
// syncing state. That's why we need to pass in the last_txg.
join3(
pool.state.clone().cleanup_log_objects(),
pool.state.clone().cleanup_uberblock_objects(last_txg),
if resume {
Either::Left(future::ready(()))
} else {
Either::Right(state.cleanup_data_objects())
},
)
.await;
}
Ok((pool, ub, next_block))
}
}

Expand Down Expand Up @@ -875,12 +966,14 @@ impl Pool {
txg,
);

debug!("resume: moving last writes to pending_object");
debug!("resume: moving last writes to pending_object and flushing");
Self::write_unordered_to_pending_object(
state,
syncing_state,
Some(*MAX_BYTES_PER_OBJECT),
);
Self::initiate_flush_object_impl(state, syncing_state);

info!("resume: completed");
})
}
Expand All @@ -905,10 +998,7 @@ impl Pool {
pub async fn end_txg(&self, uberblock: Vec<u8>, config: Vec<u8>) -> PoolStatsPhys {
let state = &self.state;

let mut syncing_state = {
let mut guard = state.syncing_state.lock().unwrap();
guard.take().unwrap()
};
let mut syncing_state = state.syncing_state.lock().unwrap().take().unwrap();

// should have already been flushed; no pending writes
assert!(syncing_state.pending_unordered_writes.is_empty());
Expand Down
9 changes: 6 additions & 3 deletions module/os/linux/zfs/vdev_object_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
#define AGENT_CAUSE "cause"
#define AGENT_HOSTNAME "hostname"
#define AGENT_READONLY "readonly"
#define AGENT_RESUME "resume"

/*
* By default, the logical/physical ashift for object store vdevs is set to
Expand Down Expand Up @@ -503,7 +504,8 @@ agent_create_pool(vdev_t *vd, vdev_object_store_t *vos)
}

static uint64_t
agent_open_pool(vdev_t *vd, vdev_object_store_t *vos, mode_t mode)
agent_open_pool(vdev_t *vd, vdev_object_store_t *vos, mode_t mode,
boolean_t resume)
{
/*
* We need to ensure that we only issue a request when the
Expand All @@ -523,6 +525,7 @@ agent_open_pool(vdev_t *vd, vdev_object_store_t *vos, mode_t mode)
fnvlist_add_string(nv, AGENT_ENDPOINT, vos->vos_endpoint);
fnvlist_add_string(nv, AGENT_REGION, vos->vos_region);
fnvlist_add_string(nv, AGENT_BUCKET, vd->vdev_path);
fnvlist_add_boolean_value(nv, AGENT_RESUME, resume);
if (mode == O_RDONLY)
fnvlist_add_boolean(nv, AGENT_READONLY);
if (vd->vdev_spa->spa_load_max_txg != UINT64_MAX) {
Expand Down Expand Up @@ -674,7 +677,7 @@ agent_resume(void *arg)
agent_create_pool(vd, vos);
}
VERIFY0(agent_open_pool(vd, vos,
vdev_object_store_open_mode(spa_mode(vd->vdev_spa))));
vdev_object_store_open_mode(spa_mode(vd->vdev_spa)), B_TRUE));

if ((ret = agent_resume_state_check(vd)) != 0) {
zfs_dbgmsg("agent resume failed, uberblock changed");
Expand Down Expand Up @@ -1262,7 +1265,7 @@ vdev_object_store_open(vdev_t *vd, uint64_t *psize, uint64_t *max_psize,
agent_create_pool(vd, vos);
}
error = agent_open_pool(vd, vos,
vdev_object_store_open_mode(spa_mode(vd->vdev_spa)));
vdev_object_store_open_mode(spa_mode(vd->vdev_spa)), B_FALSE);
if (error != 0) {
ASSERT3U(vd->vdev_spa->spa_load_state, !=, SPA_LOAD_CREATE);
return (error);
Expand Down

0 comments on commit 2726212

Please sign in to comment.