diff --git a/cmd/zdb/zdb.c b/cmd/zdb/zdb.c index 34783f3d1e7d..e716c737eec4 100644 --- a/cmd/zdb/zdb.c +++ b/cmd/zdb/zdb.c @@ -7794,6 +7794,35 @@ dump_agent_metadata(spa_t *spa) printf("Uberblock (object agent):\n"); print_zoa_nvlist(uberblock_phys); fnvlist_free(uberblock_phys); + + nvlist_t *leaked_objects = NULL; + VERIFY0(libzoa_find_leaks(zoa_handle, &leaked_objects)); + uint_t leaks_len, missing_len; + uint64_t *leaks = fnvlist_lookup_uint64_array(leaked_objects, "leaked", + &leaks_len); + uint64_t *missing = fnvlist_lookup_uint64_array(leaked_objects, + "missing", &missing_len); + + if (leaks_len > 0) { + printf("Leaked object store objects detected: %u, [%llu", + leaks_len, leaks[0]); + for (uint_t i = 1; i < leaks_len; i++) { + printf(", %llu", leaks[i]); + if (i % 4 == 0) + printf("\n\t"); + } + printf("]\n"); + } + if (missing_len > 0) { + printf("Missing object store objects detected: %u, [%llu", + missing_len, missing[0]); + for (uint_t i = 1; i < missing_len; i++) { + printf(", %llu", missing[i]); + if (i % 4 == 0) + printf("\n\t"); + } + printf("]\n"); + } } #endif diff --git a/cmd/zfs_object_agent/client/src/main.rs b/cmd/zfs_object_agent/client/src/main.rs index a1bd32f37da6..714f55373d36 100644 --- a/cmd/zfs_object_agent/client/src/main.rs +++ b/cmd/zfs_object_agent/client/src/main.rs @@ -396,7 +396,7 @@ async fn do_list_pools( // Lookup all objects in the pool. if list_all_objects { object_access - .list_objects(pool_key, None, false) + .list_objects(pool_key, false) .for_each(|object| async move { println!(" {}", object) }) .await; } @@ -410,7 +410,7 @@ async fn do_destroy_old_pools( ) -> Result<(), Box> { for pool_keys in find_old_pools(object_access, min_age).await { object_access - .delete_objects(object_access.list_objects(pool_keys, None, false)) + .delete_objects(object_access.list_objects(pool_keys, false)) .await; } Ok(()) @@ -490,7 +490,7 @@ async fn do_blob(cli_params: CliParams, count: NonZeroU32) -> Result<(), Box>() .await ); diff --git a/cmd/zfs_object_agent/object_perf/src/perf.rs b/cmd/zfs_object_agent/object_perf/src/perf.rs index 5a5b50286026..f1bb3bd6c5f8 100644 --- a/cmd/zfs_object_agent/object_perf/src/perf.rs +++ b/cmd/zfs_object_agent/object_perf/src/perf.rs @@ -72,7 +72,7 @@ impl Perf { duration: Duration, ) { let num_objects = object_access - .list_objects(key_prefix.clone(), None, true) + .list_objects(key_prefix.clone(), true) .fold(0, |count, _key| async move { count + 1 }) .await; let mut key_id = 0; @@ -167,7 +167,7 @@ pub async fn write_test( println!("{:#?}", perf.metrics.put); object_access - .delete_objects(object_access.list_objects(key_prefix, None, false)) + .delete_objects(object_access.list_objects(key_prefix, false)) .await; Ok(()) @@ -199,7 +199,7 @@ pub async fn read_test( println!("{:#?}", perf.metrics.get); object_access - .delete_objects(object_access.list_objects(key_prefix, None, false)) + .delete_objects(object_access.list_objects(key_prefix, false)) .await; Ok(()) diff --git a/cmd/zfs_object_agent/util/src/unordered.rs b/cmd/zfs_object_agent/util/src/unordered.rs index fd254c2864f4..13e1496baea6 100644 --- a/cmd/zfs_object_agent/util/src/unordered.rs +++ b/cmd/zfs_object_agent/util/src/unordered.rs @@ -77,4 +77,8 @@ where pub fn is_empty(&self) -> bool { self.pending.is_empty() } + + pub fn last(&self) -> K { + self.first + self.pending.len() + } } diff --git a/cmd/zfs_object_agent/zettaobject/src/base_types.rs b/cmd/zfs_object_agent/zettaobject/src/base_types.rs index 409dbbcc6629..f4848c78d810 100644 --- a/cmd/zfs_object_agent/zettaobject/src/base_types.rs +++ b/cmd/zfs_object_agent/zettaobject/src/base_types.rs @@ -22,6 +22,10 @@ impl Txg { Some(Txg(self.0 - rhs)) } } + + pub fn from_key(key: &str) -> Self { + Txg(key.rsplit_once('/').unwrap().1.parse().unwrap()) + } } #[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)] @@ -43,4 +47,10 @@ impl ObjectId { pub fn prefix(self) -> u64 { self.0 % NUM_DATA_PREFIXES } + + /// This function parses a key into an object id. It works for any key + /// where the last path component is the object id. + pub fn from_key(key: &str) -> Self { + ObjectId(key.rsplit_once('/').unwrap().1.parse().unwrap()) + } } diff --git a/cmd/zfs_object_agent/zettaobject/src/data_object.rs b/cmd/zfs_object_agent/zettaobject/src/data_object.rs index 884ff66806dd..2ef3b329fc44 100644 --- a/cmd/zfs_object_agent/zettaobject/src/data_object.rs +++ b/cmd/zfs_object_agent/zettaobject/src/data_object.rs @@ -12,8 +12,11 @@ use anyhow::Context; use anyhow::Result; use bytes::Bytes; use bytesize::ByteSize; +use futures::future; use futures::stream; use futures::FutureExt; +use futures::Stream; +use futures::StreamExt; use log::*; use more_asserts::*; use rusoto_core::ByteStream; @@ -39,6 +42,10 @@ tunable! { static ref DATA_OBJ_RANGED_GET: bool = false; static ref DATA_OBJ_TRY_HEADER_SIZE: ByteSize = ByteSize::kib(16); static ref OBJECT_CACHE_SIZE: usize = 100; + + // Number of block IDs to scan in parallel for recovery phase when the agent crashes in the + // middle of a TXG. + static ref RECOVERY_SCAN_COUNT: usize = 500; } lazy_static_ptr! { @@ -498,6 +505,38 @@ impl DataObject { pub fn is_empty(&self) -> bool { self.blocks.is_empty() } + + pub async fn next_uncached( + object_access: &ObjectAccess, + guid: PoolGuid, + start_from: ObjectId, + end_with: ObjectId, + ) -> Option { + stream::iter( + (0..) + .map(|i| ObjectId::new(start_from.as_min_block() + i)) + .take_while(|&object| object <= end_with) + .map(|object| async move { + Self::get_uncached(object_access, guid, object, ObjectAccessOpType::ReadsGet) + .await + }), + ) + .buffered(*RECOVERY_SCAN_COUNT) + .filter_map(|result| future::ready(result.ok())) + .next() + .await + } + + pub fn list_all( + object_access: &ObjectAccess, + guid: PoolGuid, + ) -> impl Stream + '_ { + stream::select_all(Self::prefixes(guid).map(|prefix| { + object_access + .list_objects(prefix, false) + .map(|str| ObjectId::from_key(&str)) + })) + } } impl Display for DataObject { diff --git a/cmd/zfs_object_agent/zettaobject/src/debug.rs b/cmd/zfs_object_agent/zettaobject/src/debug.rs index f1fd1b96b135..cb04df8db953 100644 --- a/cmd/zfs_object_agent/zettaobject/src/debug.rs +++ b/cmd/zfs_object_agent/zettaobject/src/debug.rs @@ -1,5 +1,7 @@ +use std::collections::BTreeSet; use std::sync::Arc; +use futures::StreamExt; use log::debug; use log::error; use nix::errno::Errno; @@ -13,6 +15,7 @@ use zettacache::CacheOpenMode; use zettacache::ZettaCache; use crate::base_types::Txg; +use crate::data_object::DataObject; use crate::object_access::ObjectAccess; use crate::object_access::ObjectAccessProtocol; use crate::pool; @@ -125,4 +128,26 @@ impl DebugHandle { }; self.runtime.block_on(future) } + + pub fn find_leaks(&self) -> Result { + let object_access = self.object_access.as_ref().unwrap(); + let pool = self.pool.as_ref().unwrap(); + let found = self.runtime.block_on( + DataObject::list_all(object_access, pool.state.shared_state.guid) + .collect::>(), + ); + let map_set = pool.state.object_block_set(); + let leaked = (&found - &map_set) + .into_iter() + .map(|id| id.as_min_block().0) + .collect::>(); + let missing = (&map_set - &found) + .into_iter() + .map(|id| id.as_min_block().0) + .collect::>(); + let mut nvl = NvList::new_unique_names(); + nvl.insert("leaked", leaked.as_slice()).unwrap(); + nvl.insert("missing", missing.as_slice()).unwrap(); + Ok(nvl) + } } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs index 2754cb6f5a5d..a05cad4b5b31 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs @@ -610,7 +610,8 @@ impl ObjectAccessTrait for BlobObjectAccess { use_delimiter: bool, list_prefixes: bool, ) -> Pin> + Send + '_>> { - let msg = format!("list {} (after {:?})", prefix, start_after); + assert!(start_after.is_none()); + let msg = format!("list {}", prefix); let list_prefix = prefix; let mut next_marker: Option = None; @@ -645,22 +646,16 @@ impl ObjectAccessTrait for BlobObjectAccess { }) .await?; - // XXX The performance of this is likely to be quite bad. We need a better solution. DOSE-1215 - let initial = start_after.clone().unwrap_or_default(); if list_prefixes { if let Some(prefixes) = output.blobs.blob_prefix { for blob_prefix in prefixes { - if initial < blob_prefix.name { yield blob_prefix.name; - } } } } else { for blob in output.blobs.blobs { - if initial < blob.name { yield blob.name; } - } } next_marker = output.next_marker.clone(); if (next_marker.is_none()) { @@ -669,6 +664,10 @@ impl ObjectAccessTrait for BlobObjectAccess { } }) } + + fn supports_list_after(&self) -> bool { + false + } } // Creation of a BlobObjectAccess object with invalid credentials can cause a crash as the azure sdk diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs index 4b27d99a7102..e06b56afb898 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs @@ -500,29 +500,44 @@ impl ObjectAccess { pub fn list_objects( &self, prefix: String, - start_after: Option, use_delimiter: bool, ) -> impl Stream + Send + '_ { self.as_trait() - .list(prefix, start_after, use_delimiter, false) + .list(prefix, None, use_delimiter, false) .map(|result| result.unwrap()) } pub fn try_list_objects( &self, prefix: String, - start_after: Option, use_delimiter: bool, ) -> impl Stream> + Send + '_ { - self.as_trait() - .list(prefix, start_after, use_delimiter, false) + self.as_trait().list(prefix, None, use_delimiter, false) } - pub async fn collect_objects( + + pub fn try_list_after( &self, prefix: String, - start_after: Option, - ) -> Vec { - self.list_objects(prefix, start_after, true).collect().await + use_delimiter: bool, + start_after: String, + ) -> Option + '_> { + if self.as_trait().supports_list_after() { + Some( + self.as_trait() + .list(prefix, Some(start_after), use_delimiter, false) + .map(|result| result.unwrap()), + ) + } else { + None + } + } + + pub fn supports_list_after(&self) -> bool { + self.as_trait().supports_list_after() + } + + pub async fn collect_objects(&self, prefix: String) -> Vec { + self.list_objects(prefix, true).collect().await } pub fn list_prefixes(&self, prefix: String) -> impl Stream + '_ { @@ -589,6 +604,14 @@ pub trait BucketAccessTrait: Send + Sync { #[async_trait] pub trait ObjectAccessTrait: Send + Sync { + /// start_after indicates whether the list should start only after a particular + /// object. However, for performance reasons, some backends may not implement + /// this functionality. On those backends, an empty stream may be returned. As a + /// result, this parameter should only be used in code paths that fail gracefully, + /// providing best-effort functionality. + /// + /// To determine whether start_after is supported for a given backend, use + /// supports_list_after(). fn list( &self, prefix: String, @@ -620,6 +643,8 @@ pub trait ObjectAccessTrait: Send + Sync { async fn delete_objects(&self, stream: &mut (dyn Stream + Send + Unpin)); fn collect_stats(&self) -> HashMap; + + fn supports_list_after(&self) -> bool; } #[derive(Debug)] diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs index a922a658ce7a..1c8877af210c 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs @@ -518,4 +518,8 @@ impl ObjectAccessTrait for S3ObjectAccess { } }) } + + fn supports_list_after(&self) -> bool { + true + } } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs b/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs index e20671cbb3e6..4abca05945d1 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs @@ -5,7 +5,6 @@ use std::time::Instant; use anyhow::Context; use anyhow::Result; use futures::future; -use futures::future::join; use futures::future::join_all; use futures::stream; use futures::stream::StreamExt; @@ -68,9 +67,7 @@ impl ObjectBasedLogPhys { object_access .delete_objects( generations - .flat_map(|generation| { - Box::pin(object_access.list_objects(generation, None, false)) - }) + .flat_map(|generation| Box::pin(object_access.list_objects(generation, false))) .inspect(|key| trace!("cleanup: old generation chunk {}", key)), ) .await; @@ -148,7 +145,6 @@ pub struct ObjectBasedLog { pub num_chunks: u64, pub num_entries: u64, pending_entries: Vec, - recovered: bool, pending_flushes: Vec>, } @@ -165,7 +161,6 @@ impl ObjectBasedLog { num_flushed_chunks: 0, num_chunks: 0, num_entries: 0, - recovered: true, pending_entries: Vec::new(), pending_flushes: Vec::new(), } @@ -182,7 +177,6 @@ impl ObjectBasedLog { num_flushed_chunks: phys.num_chunks, num_chunks: phys.num_chunks, num_entries: phys.num_entries, - recovered: false, pending_entries: Vec::new(), pending_flushes: Vec::new(), } @@ -195,66 +189,6 @@ impl ObjectBasedLog { } */ - /// Return this log's parent prefix (e.g. zfs/15238822373695050151/PendingFreesLog) - pub fn parent_prefix(&self) -> String { - self.name.rsplitn(2, '/').last().unwrap().to_string() - } - - /// Recover after a system crash, where the kernel also crashed and we are discarding - /// any changes after the current txg. - pub async fn cleanup(&mut self) { - // collect chunks past the end, in the current generation - let shared_state = self.shared_state.clone(); - let last_generation_key = format!("{}/{:020}/", self.name, self.generation); - let start_after = if self.num_chunks == 0 { - None - } else { - Some(ObjectBasedLogChunk::::key( - &self.name, - self.generation, - self.num_chunks - 1, - )) - }; - let current_generation_cleanup = async move { - shared_state - .object_access - .delete_objects( - shared_state - .object_access - .list_objects(last_generation_key, start_after, true) - .inspect(|key| { - info!( - "cleanup: deleting future chunk of current generation: {}", - key - ) - }), - ) - .await; - }; - - // collect chunks from the partially-complete future generation - let shared_state = self.shared_state.clone(); - let next_generation_key = format!("{}/{:020}/", self.name, self.generation + 1); - let next_generation_cleanup = async move { - shared_state - .object_access - .delete_objects( - shared_state - .object_access - .list_objects(next_generation_key, None, true) - .inspect(|key| { - info!("cleanup: deleting chunk of future generation: {}", key) - }), - ) - .await; - }; - - // execute both cleanup's concurrently - join(current_generation_cleanup, next_generation_cleanup).await; - - self.recovered = true; - } - pub fn to_phys(&self) -> ObjectBasedLogPhys { ObjectBasedLogPhys { generation: self.generation, @@ -266,7 +200,6 @@ impl ObjectBasedLog { } pub fn append(&mut self, txg: Txg, entry: T) { - assert!(self.recovered); // XXX assert that txg is the same as the txg for the other pending entries? self.pending_entries.push(entry); // XXX should be based on chunk size (bytes)? Or maybe should just be unlimited. @@ -276,8 +209,6 @@ impl ObjectBasedLog { } pub fn initiate_flush(&mut self, txg: Txg) { - assert!(self.recovered); - let chunk = ObjectBasedLogChunk { guid: self.shared_state.guid, txg, diff --git a/cmd/zfs_object_agent/zettaobject/src/object_block_map.rs b/cmd/zfs_object_agent/zettaobject/src/object_block_map.rs index 717f3e679512..b62464810863 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_block_map.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_block_map.rs @@ -153,4 +153,8 @@ impl ObjectBlockMap { f(object); } } + + pub fn raw_set(&self) -> BTreeSet { + self.state.read().unwrap().map.clone() + } } diff --git a/cmd/zfs_object_agent/zettaobject/src/pool.rs b/cmd/zfs_object_agent/zettaobject/src/pool.rs index 27ff19194ff8..c88fcef564e2 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool.rs @@ -23,11 +23,8 @@ use bytesize::ByteSize; use derivative::Derivative; use futures::future; use futures::future::join3; -use futures::future::join5; -use futures::future::Either; use futures::future::Future; use futures::stream; -use futures::stream::select_all::select_all; use futures::stream::*; use futures::FutureExt; use lazy_static::lazy_static; @@ -397,33 +394,18 @@ impl UberblockPhys { .await; } - async fn delete_many(object_access: &ObjectAccess, guid: PoolGuid, txgs: Vec) { + async fn cleanup_older_uberblocks(object_access: &ObjectAccess, ub: UberblockPhys) { object_access - .delete_objects(stream::iter( - txgs.into_iter().map(|txg| Self::key(guid, txg)), - )) + .delete_objects( + object_access + .list_objects(format!("zfs/{}/txg/", ub.guid), true) + .map(|prefix| Txg::from_key(prefix.as_str())) + .filter(|&txg| future::ready(txg < ub.txg)) + .inspect(|txg| debug!("deleting old uberblock {txg:?}")) + .map(|txg| Self::key(ub.guid, txg)), + ) .await; } - - async fn cleanup_older_uberblocks(object_access: &ObjectAccess, ub: UberblockPhys) { - let mut txgs: Vec = object_access - .collect_objects(format!("zfs/{}/txg/", ub.guid), None) - .await - .iter() - .map(|prefix| { - Txg(prefix.rsplit('/').collect::>()[0] - .parse::() - .unwrap()) - }) - .collect(); - - txgs.retain(|txg| txg < &ub.txg); - if txgs.is_empty() { - return; - } - debug!("Deleting old uberblocks: {:?}", txgs); - Self::delete_many(object_access, ub.guid, txgs).await; - } } /* @@ -735,83 +717,6 @@ impl PoolState { f(guard.as_mut().unwrap()) } - async fn cleanup_uberblock_objects(&self, last_txg: Txg) { - let shared_state = &self.shared_state; - let txg_key = format!("zfs/{}/txg/", shared_state.guid); - let start_after = Some(UberblockPhys::key(shared_state.guid, last_txg)); - shared_state - .object_access - .delete_objects( - shared_state - .object_access - .list_objects(txg_key, start_after, true) - .inspect(|key| info!("cleanup: deleting future uberblock: {}", key)), - ) - .await; - } - - /// Remove log objects from log at prefix starting at next_id - async fn cleanup_orphaned_logs(&self, prefix: String, next_id: ReclaimLogId) { - let shared_state = &self.shared_state.clone(); - let start_after = Some(format!("{}/{}", prefix, next_id)); - shared_state - .object_access - .delete_objects( - shared_state - .object_access - .list_objects(prefix, start_after, false) - .inspect(|key| info!("cleanup: deleting orphaned log object: {}", key)), - ) - .await; - } - - /// Remove any log objects that are invalid (i.e. created as part of an - /// in-progress txg before the kernel or agent crashed) - async fn cleanup_log_objects(&self) { - let mut syncing_state = self.syncing_state.lock().unwrap().take().unwrap(); - let next_log_id = ReclaimLogId( - syncing_state - .reclaim_info - .reclaim_logs - .len() - .try_into() - .unwrap(), - ); - - let begin = Instant::now(); - - // Cleanup any orphaned pending_frees and object_size logs (from next_log_id and greater) - // This occurs if we crash after splitting a log but didn't complete syncing the txg - let pending_frees_log_prefix = syncing_state.reclaim_info.reclaim_logs[0] - .pending_frees_log - .parent_prefix(); - let object_size_log_prefix = syncing_state.reclaim_info.reclaim_logs[0] - .object_size_log - .parent_prefix(); - - let frees_log_stream = FuturesUnordered::new(); - let size_log_stream = FuturesUnordered::new(); - for log in syncing_state.reclaim_info.reclaim_logs.iter_mut() { - frees_log_stream.push(log.pending_frees_log.cleanup()); - size_log_stream.push(log.object_size_log.cleanup()); - } - join5( - syncing_state.storage_object_log.cleanup(), - frees_log_stream.count(), - size_log_stream.count(), - self.cleanup_orphaned_logs(pending_frees_log_prefix, next_log_id), - self.cleanup_orphaned_logs(object_size_log_prefix, next_log_id), - ) - .await; - assert!(self.syncing_state.lock().unwrap().is_none()); - *self.syncing_state.lock().unwrap() = Some(syncing_state); - - info!( - "cleanup: found and deleted log objects in {}ms", - begin.elapsed().as_millis() - ); - } - /// Remove any data objects that were created as part of an in-progress txg /// before the kernel crashed. async fn cleanup_data_objects(&self) { @@ -823,8 +728,12 @@ impl PoolState { oa.delete_objects( select_all(DataObject::prefixes(shared_state.guid).map(|prefix| { - let start_after = Some(format!("{}{}", prefix, last_obj)); - oa.list_objects(prefix, start_after, true).boxed() + let start_after = format!("{}{}", prefix, last_obj); + match oa.try_list_after(prefix, true, start_after) { + Some(stream) => stream.left_stream(), + None => stream::empty().right_stream(), + } + .boxed() })) .inspect(|_| count += 1), ) @@ -836,6 +745,10 @@ impl PoolState { begin.elapsed().as_millis() ); } + + pub fn object_block_set(&self) -> BTreeSet { + self.object_block_map.raw_set() + } } impl Pool { @@ -1100,7 +1013,6 @@ impl Pool { let last_txg = pool .state .with_syncing_state(|syncing_state| syncing_state.last_txg); - let state = pool.state.clone(); if last_txg != phys.last_txg { // We opened an older TXG. Before cleaning up (deleting) // future TXG's, update the super object to the old TXG, so @@ -1109,20 +1021,9 @@ impl Pool { let new_phys = PoolPhys { last_txg, ..phys }; new_phys.put(&object_access).await; } - - // Note: cleanup_log_objects() take()'s the syncing_state, so - // the other concurrently-executed cleanups can not access the - // syncing state. That's why we need to pass in the last_txg. - join3( - pool.state.clone().cleanup_log_objects(), - pool.state.clone().cleanup_uberblock_objects(last_txg), - if syncing_txg.is_some() { - Either::Left(future::ready(())) - } else { - Either::Right(state.cleanup_data_objects()) - }, - ) - .await; + if syncing_txg.is_none() { + pool.state.cleanup_data_objects().await; + } } Ok((pool, Some(ub), next_block)) } @@ -1131,70 +1032,74 @@ impl Pool { async fn get_recovered_objects( state: &Arc, shared_state: &Arc, - txg: Txg, + final_write: BlockId, ) -> BTreeMap { - let begin = Instant::now(); - let last_obj = state.object_block_map.last_object(); - let list_stream = FuturesUnordered::new(); - for prefix in DataObject::prefixes(shared_state.guid) { - let shared_state = shared_state.clone(); - list_stream.push(async move { - let start_after = Some(format!("{}{}", prefix, last_obj)); - shared_state - .object_access - .collect_objects(prefix, start_after) - .await - }); + if shared_state.object_access.supports_list_after() { + let recovered = recover_list(state, shared_state).await; + assert!(recovered + .iter() + .next_back() + .map(|(k, _)| k.as_min_block() <= final_write) + .unwrap_or(true)); + return recovered; + } + + let mut recovered = BTreeMap::new(); + let (last_object, _) = DataObject::get( + &shared_state.object_access, + shared_state.guid, + state.object_block_map.last_object(), + ) + .await + .unwrap(); + + let mut next_id = ObjectId::new(last_object.header.next_block); + loop { + while let Ok(object) = DataObject::get_uncached( + &shared_state.object_access, + shared_state.guid, + next_id, + ObjectAccessOpType::ReadsGet, + ) + .await + { + next_id = ObjectId::new(object.header.next_block); + recovered.insert(object.header.object, object); + } + if next_id.as_min_block() >= final_write { + break; + } + if let Some(object) = DataObject::next_uncached( + &shared_state.object_access, + shared_state.guid, + next_id, + ObjectId::new(final_write), + ) + .await + { + next_id = ObjectId::new(object.header.next_block); + recovered.insert(object.header.object, object); + } else { + break; + } } - let recovered = list_stream - .flat_map(|vec| { - let sub_stream = FuturesUnordered::new(); - for key in vec { - let shared_state = shared_state.clone(); - sub_stream.push(future::ready(async move { - DataObject::get_from_key( - &shared_state.object_access, - key, - ObjectAccessOpType::ReadsGet, - ) - .await - })); - } - sub_stream - }) - .buffer_unordered(50) - .fold(BTreeMap::new(), |mut map, data_res| async move { - let data = data_res.unwrap(); - debug!( - "resume: found {:?}, next {:?}", - data.header.object, data.header.next_block - ); - assert_eq!(data.header.guid, shared_state.guid); - assert_eq!(data.header.min_txg, txg); - assert_eq!(data.header.max_txg, txg); - map.insert(data.header.object, data); - map - }) - .await; - info!( - "resume: listed and read {} objects in {}ms", - recovered.len(), - begin.elapsed().as_millis() - ); recovered } pub async fn resume_complete(&self) { let state = &self.state; - let txg = self.state.with_syncing_state(|syncing_state| { + let (txg, final_write) = self.state.with_syncing_state(|syncing_state| { // verify that we're in resuming state assert!(!syncing_state.pending_object.is_pending()); - syncing_state.syncing_txg.unwrap() + ( + syncing_state.syncing_txg.unwrap(), + syncing_state.pending_unordered_writes.last(), + ) }); let shared_state = &state.shared_state; - let recovered_objects = Self::get_recovered_objects(state, shared_state, txg).await; + let recovered_objects = Self::get_recovered_objects(state, shared_state, final_write).await; self.state.with_syncing_state(|syncing_state| { let mut recovered_objects_iter = recovered_objects.into_iter().peekable(); @@ -1939,6 +1844,63 @@ impl Pool { } } +async fn recover_list( + state: &Arc, + shared_state: &Arc, +) -> BTreeMap { + assert!(shared_state.object_access.supports_list_after()); + + let begin = Instant::now(); + let last_obj = state.object_block_map.last_object(); + let list_stream = FuturesUnordered::new(); + for prefix in DataObject::prefixes(shared_state.guid) { + let shared_state = shared_state.clone(); + list_stream.push(async move { + let start_after = format!("{}{}", prefix, last_obj); + shared_state + .object_access + .try_list_after(prefix, true, start_after) + .unwrap() + .collect::>() + .await + }); + } + let recovered = list_stream + .flat_map(|vec| { + let sub_stream = FuturesUnordered::new(); + for key in vec { + let shared_state = shared_state.clone(); + sub_stream.push(future::ready(async move { + DataObject::get_from_key( + &shared_state.object_access, + key, + ObjectAccessOpType::ReadsGet, + ) + .await + })); + } + sub_stream + }) + .buffer_unordered(50) + .fold(BTreeMap::new(), |mut map, data_res| async move { + let data = data_res.unwrap(); + debug!( + "resume: found {:?}, next {:?}", + data.header.object, data.header.next_block + ); + assert_eq!(data.header.guid, shared_state.guid); + map.insert(data.header.object, data); + map + }) + .await; + info!( + "resume: listed and read {} objects in {}ms", + recovered.len(), + begin.elapsed().as_millis() + ); + recovered +} + async fn handle_final_owner( object_access: &Arc, guid: PoolGuid, diff --git a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs index ecb75186968d..592aa42016af 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs @@ -303,7 +303,7 @@ fn delete_pool_objects( object_access .delete_objects( object_access - .list_objects(prefix, None, false) + .list_objects(prefix, false) // Skip the super object as we use it to track the progress made by this task. .filter(|o| futures::future::ready(super_object.ne(o))) .inspect(|_| { diff --git a/cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs b/cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs index d7e27adc119e..b186961a9de9 100644 --- a/cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs +++ b/cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs @@ -83,7 +83,7 @@ async fn do_test_connectivity(object_access: &ObjectAccess) -> Result<(), String } let objects = object_access - .try_list_objects(prefix, None, false) + .try_list_objects(prefix, false) .try_collect::>() .await; diff --git a/cmd/zfs_object_agent/zoa/src/lib.rs b/cmd/zfs_object_agent/zoa/src/lib.rs index cd9ec94b5dc5..594890517477 100644 --- a/cmd/zfs_object_agent/zoa/src/lib.rs +++ b/cmd/zfs_object_agent/zoa/src/lib.rs @@ -119,3 +119,17 @@ pub unsafe extern "C" fn libzoa_get_uberblock_phys( let res = handle.get_uberblock_phys(PoolGuid(guid), Txg(txg)); set_out_nvl(out, res) } + +/// # Safety +/// In order to use this function safely: +/// * out must be a valid pointer to a not-necessarily valid pointer to an nvlist_t. +/// * handle must be a pointer that was previously returned by libzoa_init(). +#[no_mangle] +pub unsafe extern "C" fn libzoa_find_leaks( + raw_handle: *mut zoa_handle_t, + out: *mut *mut nvpair_sys::nvlist_t, +) -> i32 { + let handle = raw_handle.cast::().as_mut().unwrap(); + let res = handle.find_leaks(); + set_out_nvl(out, res) +}