Skip to content

Commit

Permalink
DLPX-81772 Redesign to handle lack of starts_with in list_objects for…
Browse files Browse the repository at this point in the history
… Blob (openzfs#468)

Signed-off-by: Paul Dagnelie <[email protected]>
  • Loading branch information
pcd1193182 authored Jun 24, 2022
1 parent aa55e0d commit 963074d
Show file tree
Hide file tree
Showing 16 changed files with 312 additions and 266 deletions.
29 changes: 29 additions & 0 deletions cmd/zdb/zdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -7794,6 +7794,35 @@ dump_agent_metadata(spa_t *spa)
printf("Uberblock (object agent):\n");
print_zoa_nvlist(uberblock_phys);
fnvlist_free(uberblock_phys);

nvlist_t *leaked_objects = NULL;
VERIFY0(libzoa_find_leaks(zoa_handle, &leaked_objects));
uint_t leaks_len, missing_len;
uint64_t *leaks = fnvlist_lookup_uint64_array(leaked_objects, "leaked",
&leaks_len);
uint64_t *missing = fnvlist_lookup_uint64_array(leaked_objects,
"missing", &missing_len);

if (leaks_len > 0) {
printf("Leaked object store objects detected: %u, [%llu",
leaks_len, leaks[0]);
for (uint_t i = 1; i < leaks_len; i++) {
printf(", %llu", leaks[i]);
if (i % 4 == 0)
printf("\n\t");
}
printf("]\n");
}
if (missing_len > 0) {
printf("Missing object store objects detected: %u, [%llu",
missing_len, missing[0]);
for (uint_t i = 1; i < missing_len; i++) {
printf(", %llu", missing[i]);
if (i % 4 == 0)
printf("\n\t");
}
printf("]\n");
}
}
#endif

Expand Down
6 changes: 3 additions & 3 deletions cmd/zfs_object_agent/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ async fn do_list_pools(
// Lookup all objects in the pool.
if list_all_objects {
object_access
.list_objects(pool_key, None, false)
.list_objects(pool_key, false)
.for_each(|object| async move { println!(" {}", object) })
.await;
}
Expand All @@ -410,7 +410,7 @@ async fn do_destroy_old_pools(
) -> Result<(), Box<dyn Error>> {
for pool_keys in find_old_pools(object_access, min_age).await {
object_access
.delete_objects(object_access.list_objects(pool_keys, None, false))
.delete_objects(object_access.list_objects(pool_keys, false))
.await;
}
Ok(())
Expand Down Expand Up @@ -490,7 +490,7 @@ async fn do_blob(cli_params: CliParams, count: NonZeroU32) -> Result<(), Box<dyn
println!(
"List blobs {:?}",
object_access
.list_objects("".to_string(), None, true)
.list_objects("".to_string(), true)
.collect::<Vec<String>>()
.await
);
Expand Down
6 changes: 3 additions & 3 deletions cmd/zfs_object_agent/object_perf/src/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Perf {
duration: Duration,
) {
let num_objects = object_access
.list_objects(key_prefix.clone(), None, true)
.list_objects(key_prefix.clone(), true)
.fold(0, |count, _key| async move { count + 1 })
.await;
let mut key_id = 0;
Expand Down Expand Up @@ -167,7 +167,7 @@ pub async fn write_test(
println!("{:#?}", perf.metrics.put);

object_access
.delete_objects(object_access.list_objects(key_prefix, None, false))
.delete_objects(object_access.list_objects(key_prefix, false))
.await;

Ok(())
Expand Down Expand Up @@ -199,7 +199,7 @@ pub async fn read_test(
println!("{:#?}", perf.metrics.get);

object_access
.delete_objects(object_access.list_objects(key_prefix, None, false))
.delete_objects(object_access.list_objects(key_prefix, false))
.await;

Ok(())
Expand Down
4 changes: 4 additions & 0 deletions cmd/zfs_object_agent/util/src/unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ where
pub fn is_empty(&self) -> bool {
self.pending.is_empty()
}

pub fn last(&self) -> K {
self.first + self.pending.len()
}
}
10 changes: 10 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/base_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl Txg {
Some(Txg(self.0 - rhs))
}
}

pub fn from_key(key: &str) -> Self {
Txg(key.rsplit_once('/').unwrap().1.parse().unwrap())
}
}

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
Expand All @@ -43,4 +47,10 @@ impl ObjectId {
pub fn prefix(self) -> u64 {
self.0 % NUM_DATA_PREFIXES
}

/// This function parses a key into an object id. It works for any key
/// where the last path component is the object id.
pub fn from_key(key: &str) -> Self {
ObjectId(key.rsplit_once('/').unwrap().1.parse().unwrap())
}
}
39 changes: 39 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/data_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use anyhow::Context;
use anyhow::Result;
use bytes::Bytes;
use bytesize::ByteSize;
use futures::future;
use futures::stream;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
use log::*;
use more_asserts::*;
use rusoto_core::ByteStream;
Expand All @@ -39,6 +42,10 @@ tunable! {
static ref DATA_OBJ_RANGED_GET: bool = false;
static ref DATA_OBJ_TRY_HEADER_SIZE: ByteSize = ByteSize::kib(16);
static ref OBJECT_CACHE_SIZE: usize = 100;

// Number of block IDs to scan in parallel for recovery phase when the agent crashes in the
// middle of a TXG.
static ref RECOVERY_SCAN_COUNT: usize = 500;
}

lazy_static_ptr! {
Expand Down Expand Up @@ -498,6 +505,38 @@ impl DataObject {
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
}

pub async fn next_uncached(
object_access: &ObjectAccess,
guid: PoolGuid,
start_from: ObjectId,
end_with: ObjectId,
) -> Option<Self> {
stream::iter(
(0..)
.map(|i| ObjectId::new(start_from.as_min_block() + i))
.take_while(|&object| object <= end_with)
.map(|object| async move {
Self::get_uncached(object_access, guid, object, ObjectAccessOpType::ReadsGet)
.await
}),
)
.buffered(*RECOVERY_SCAN_COUNT)
.filter_map(|result| future::ready(result.ok()))
.next()
.await
}

pub fn list_all(
object_access: &ObjectAccess,
guid: PoolGuid,
) -> impl Stream<Item = ObjectId> + '_ {
stream::select_all(Self::prefixes(guid).map(|prefix| {
object_access
.list_objects(prefix, false)
.map(|str| ObjectId::from_key(&str))
}))
}
}

impl Display for DataObject {
Expand Down
25 changes: 25 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/debug.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::BTreeSet;
use std::sync::Arc;

use futures::StreamExt;
use log::debug;
use log::error;
use nix::errno::Errno;
Expand All @@ -13,6 +15,7 @@ use zettacache::CacheOpenMode;
use zettacache::ZettaCache;

use crate::base_types::Txg;
use crate::data_object::DataObject;
use crate::object_access::ObjectAccess;
use crate::object_access::ObjectAccessProtocol;
use crate::pool;
Expand Down Expand Up @@ -125,4 +128,26 @@ impl DebugHandle {
};
self.runtime.block_on(future)
}

pub fn find_leaks(&self) -> Result<NvList, Errno> {
let object_access = self.object_access.as_ref().unwrap();
let pool = self.pool.as_ref().unwrap();
let found = self.runtime.block_on(
DataObject::list_all(object_access, pool.state.shared_state.guid)
.collect::<BTreeSet<_>>(),
);
let map_set = pool.state.object_block_set();
let leaked = (&found - &map_set)
.into_iter()
.map(|id| id.as_min_block().0)
.collect::<Vec<_>>();
let missing = (&map_set - &found)
.into_iter()
.map(|id| id.as_min_block().0)
.collect::<Vec<_>>();
let mut nvl = NvList::new_unique_names();
nvl.insert("leaked", leaked.as_slice()).unwrap();
nvl.insert("missing", missing.as_slice()).unwrap();
Ok(nvl)
}
}
13 changes: 6 additions & 7 deletions cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ impl ObjectAccessTrait for BlobObjectAccess {
use_delimiter: bool,
list_prefixes: bool,
) -> Pin<Box<dyn Stream<Item = Result<String>> + Send + '_>> {
let msg = format!("list {} (after {:?})", prefix, start_after);
assert!(start_after.is_none());
let msg = format!("list {}", prefix);
let list_prefix = prefix;
let mut next_marker: Option<NextMarker> = None;

Expand Down Expand Up @@ -645,22 +646,16 @@ impl ObjectAccessTrait for BlobObjectAccess {
})
.await?;

// XXX The performance of this is likely to be quite bad. We need a better solution. DOSE-1215
let initial = start_after.clone().unwrap_or_default();
if list_prefixes {
if let Some(prefixes) = output.blobs.blob_prefix {
for blob_prefix in prefixes {
if initial < blob_prefix.name {
yield blob_prefix.name;
}
}
}
} else {
for blob in output.blobs.blobs {
if initial < blob.name {
yield blob.name;
}
}
}
next_marker = output.next_marker.clone();
if (next_marker.is_none()) {
Expand All @@ -669,6 +664,10 @@ impl ObjectAccessTrait for BlobObjectAccess {
}
})
}

fn supports_list_after(&self) -> bool {
false
}
}

// Creation of a BlobObjectAccess object with invalid credentials can cause a crash as the azure sdk
Expand Down
43 changes: 34 additions & 9 deletions cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,29 +500,44 @@ impl ObjectAccess {
pub fn list_objects(
&self,
prefix: String,
start_after: Option<String>,
use_delimiter: bool,
) -> impl Stream<Item = String> + Send + '_ {
self.as_trait()
.list(prefix, start_after, use_delimiter, false)
.list(prefix, None, use_delimiter, false)
.map(|result| result.unwrap())
}

pub fn try_list_objects(
&self,
prefix: String,
start_after: Option<String>,
use_delimiter: bool,
) -> impl Stream<Item = Result<String>> + Send + '_ {
self.as_trait()
.list(prefix, start_after, use_delimiter, false)
self.as_trait().list(prefix, None, use_delimiter, false)
}
pub async fn collect_objects(

pub fn try_list_after(
&self,
prefix: String,
start_after: Option<String>,
) -> Vec<String> {
self.list_objects(prefix, start_after, true).collect().await
use_delimiter: bool,
start_after: String,
) -> Option<impl Stream<Item = String> + '_> {
if self.as_trait().supports_list_after() {
Some(
self.as_trait()
.list(prefix, Some(start_after), use_delimiter, false)
.map(|result| result.unwrap()),
)
} else {
None
}
}

pub fn supports_list_after(&self) -> bool {
self.as_trait().supports_list_after()
}

pub async fn collect_objects(&self, prefix: String) -> Vec<String> {
self.list_objects(prefix, true).collect().await
}

pub fn list_prefixes(&self, prefix: String) -> impl Stream<Item = String> + '_ {
Expand Down Expand Up @@ -589,6 +604,14 @@ pub trait BucketAccessTrait: Send + Sync {

#[async_trait]
pub trait ObjectAccessTrait: Send + Sync {
/// start_after indicates whether the list should start only after a particular
/// object. However, for performance reasons, some backends may not implement
/// this functionality. On those backends, an empty stream may be returned. As a
/// result, this parameter should only be used in code paths that fail gracefully,
/// providing best-effort functionality.
///
/// To determine whether start_after is supported for a given backend, use
/// supports_list_after().
fn list(
&self,
prefix: String,
Expand Down Expand Up @@ -620,6 +643,8 @@ pub trait ObjectAccessTrait: Send + Sync {
async fn delete_objects(&self, stream: &mut (dyn Stream<Item = String> + Send + Unpin));

fn collect_stats(&self) -> HashMap<String, StatMapValue>;

fn supports_list_after(&self) -> bool;
}

#[derive(Debug)]
Expand Down
4 changes: 4 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,8 @@ impl ObjectAccessTrait for S3ObjectAccess {
}
})
}

fn supports_list_after(&self) -> bool {
true
}
}
Loading

0 comments on commit 963074d

Please sign in to comment.