diff --git a/cmd/zfs_object_agent/Cargo.lock b/cmd/zfs_object_agent/Cargo.lock index bbc0a37cbdab..03a2313d207d 100644 --- a/cmd/zfs_object_agent/Cargo.lock +++ b/cmd/zfs_object_agent/Cargo.lock @@ -4892,6 +4892,7 @@ dependencies = [ name = "zfs_object_perf" version = "0.1.0" dependencies = [ + "anyhow", "clap 4.3.3", "futures", "git-version", diff --git a/cmd/zfs_object_agent/object_perf/Cargo.toml b/cmd/zfs_object_agent/object_perf/Cargo.toml index af0213f18056..d9d1cc8c526e 100644 --- a/cmd/zfs_object_agent/object_perf/Cargo.toml +++ b/cmd/zfs_object_agent/object_perf/Cargo.toml @@ -8,6 +8,7 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0" clap = { version = "4.1.6", features = ["derive", "wrap_help"] } git-version = "0.3.5" log = "0.4" diff --git a/cmd/zfs_object_agent/object_perf/src/main.rs b/cmd/zfs_object_agent/object_perf/src/main.rs index 42fca53b2a14..ff32d4b1a74c 100644 --- a/cmd/zfs_object_agent/object_perf/src/main.rs +++ b/cmd/zfs_object_agent/object_perf/src/main.rs @@ -3,6 +3,7 @@ #![allow(clippy::print_stdout)] use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use clap::ArgAction; @@ -28,7 +29,7 @@ static GIT_VERSION: &str = git_version!( } ); -#[derive(Args)] +#[derive(Args, Clone)] struct S3Args { /// S3 endpoint #[arg(short = 'e', long, default_value = ENDPOINT)] @@ -45,9 +46,12 @@ struct S3Args { /// credentials profile #[arg(short = 'p', long)] profile: Option, + + #[arg(long)] + instance_profile: bool, } -#[derive(Args)] +#[derive(Args, Clone)] struct BlobArgs { /// Blob endpoint #[arg(short = 'e', long)] @@ -62,7 +66,7 @@ struct BlobArgs { profile: Option, } -#[derive(Args)] +#[derive(Args, Clone)] struct OciArgs { /// OCI bucket #[arg(short = 'b', long, default_value = BUCKET_NAME)] @@ -119,6 +123,14 @@ enum Commands { s3_args: S3Args, }, + /// Delete s3 test + DeleteS3 { + #[command(flatten)] + s3_args: S3Args, + #[arg(short = 'c', long, default_value = "2048")] + objcount: u64, + }, + /// Write blob test WriteBlob { #[command(flatten)] @@ -131,6 +143,14 @@ enum Commands { blob_args: BlobArgs, }, + /// Delete s3 test + DeleteBlob { + #[command(flatten)] + blob_args: BlobArgs, + #[arg(short = 'c', long, default_value = "2048")] + objcount: u64, + }, + /// Write OCI test WriteOci { #[command(flatten)] @@ -142,6 +162,60 @@ enum Commands { #[command(flatten)] oci_args: OciArgs, }, + + /// Delete s3 test + DeleteOci { + #[command(flatten)] + oci_args: OciArgs, + #[arg(short = 'c', long, default_value = "2048")] + objcount: u64, + }, +} + +async fn s3_object_access(s3_args: S3Args) -> Arc { + ObjectAccess::new( + ObjectAccessProtocol::S3 { + endpoint: s3_args.endpoint, + region: s3_args.region, + credentials: match (s3_args.instance_profile, s3_args.profile) { + (true, _) => S3Credentials::InstanceProfile, + (false, Some(profile)) => S3Credentials::Profile(profile), + (false, None) => S3Credentials::Automatic, + }, + }, + s3_args.bucket, + false, + ) + .await + .unwrap() +} + +async fn blob_object_access(blob_args: BlobArgs) -> Arc { + ObjectAccess::new( + ObjectAccessProtocol::Blob { + endpoint: blob_args.endpoint, + credentials: match blob_args.profile { + Some(profile) => BlobCredentials::Profile(profile), + None => BlobCredentials::Automatic, + }, + }, + blob_args.bucket, + false, + ) + .await + .unwrap() +} + +async fn oci_object_access(oci_args: OciArgs) -> Arc { + ObjectAccess::new( + ObjectAccessProtocol::Oci { + credentials: zettaobject::object_access::OciCredentials::InstancePrincipal, + }, + oci_args.bucket, + false, + ) + .await + .unwrap() } #[tokio::main] @@ -162,74 +236,20 @@ async fn main() { let key_prefix = format!("zfs_object_perf/{}/", Uuid::new_v4()); println!("Using prefix: '{key_prefix}'"); - match cli.command { - Commands::WriteS3 { s3_args } => { - let object_access = ObjectAccess::new( - ObjectAccessProtocol::S3 { - endpoint: s3_args.endpoint, - region: s3_args.region, - credentials: match s3_args.profile { - Some(profile) => S3Credentials::Profile(profile), - None => S3Credentials::Automatic, - }, - }, - s3_args.bucket, - false, - ) - .await - .unwrap(); - - perf::write_test( - object_access, - key_prefix, - objsize_bytes, - cli.qdepth, - duration, - ) - .await - .unwrap(); - } - Commands::ReadS3 { s3_args } => { - let object_access = ObjectAccess::new( - ObjectAccessProtocol::S3 { - endpoint: s3_args.endpoint, - region: s3_args.region, - credentials: match s3_args.profile { - Some(profile) => S3Credentials::Profile(profile), - None => S3Credentials::Automatic, - }, - }, - s3_args.bucket, - false, - ) - .await - .unwrap(); - - perf::read_test( - object_access, - key_prefix, - objsize_bytes, - cli.qdepth, - duration, - ) - .await - .unwrap(); - } - Commands::WriteBlob { blob_args } => { - let object_access = ObjectAccess::new( - ObjectAccessProtocol::Blob { - endpoint: blob_args.endpoint, - credentials: match blob_args.profile { - Some(profile) => BlobCredentials::Profile(profile), - None => BlobCredentials::Automatic, - }, - }, - blob_args.bucket, - false, - ) - .await - .unwrap(); + let object_access = match &cli.command { + Commands::WriteS3 { s3_args } + | Commands::ReadS3 { s3_args } + | Commands::DeleteS3 { s3_args, .. } => s3_object_access(s3_args.clone()).await, + Commands::WriteBlob { blob_args } + | Commands::ReadBlob { blob_args } + | Commands::DeleteBlob { blob_args, .. } => blob_object_access(blob_args.clone()).await, + Commands::WriteOci { oci_args } + | Commands::ReadOci { oci_args } + | Commands::DeleteOci { oci_args, .. } => oci_object_access(oci_args.clone()).await, + }; + match cli.command { + Commands::WriteS3 { .. } | Commands::WriteBlob { .. } | Commands::WriteOci { .. } => { perf::write_test( object_access, key_prefix, @@ -240,21 +260,7 @@ async fn main() { .await .unwrap(); } - Commands::ReadBlob { blob_args } => { - let object_access = ObjectAccess::new( - ObjectAccessProtocol::Blob { - endpoint: blob_args.endpoint, - credentials: match blob_args.profile { - Some(profile) => BlobCredentials::Profile(profile), - None => BlobCredentials::Automatic, - }, - }, - blob_args.bucket, - false, - ) - .await - .unwrap(); - + Commands::ReadS3 { .. } | Commands::ReadBlob { .. } | Commands::ReadOci { .. } => { perf::read_test( object_access, key_prefix, @@ -265,44 +271,15 @@ async fn main() { .await .unwrap(); } - Commands::WriteOci { oci_args } => { - let object_access = ObjectAccess::new( - ObjectAccessProtocol::Oci { - credentials: zettaobject::object_access::OciCredentials::InstancePrincipal, - }, - oci_args.bucket, - false, - ) - .await - .unwrap(); - - perf::write_test( - object_access, - key_prefix, - objsize_bytes, - cli.qdepth, - duration, - ) - .await - .unwrap(); - } - Commands::ReadOci { oci_args } => { - let object_access = ObjectAccess::new( - ObjectAccessProtocol::Oci { - credentials: zettaobject::object_access::OciCredentials::InstancePrincipal, - }, - oci_args.bucket, - false, - ) - .await - .unwrap(); - - perf::read_test( + Commands::DeleteS3 { objcount, .. } + | Commands::DeleteBlob { objcount, .. } + | Commands::DeleteOci { objcount, .. } => { + perf::delete_test( object_access, key_prefix, objsize_bytes, cli.qdepth, - duration, + objcount, ) .await .unwrap(); diff --git a/cmd/zfs_object_agent/object_perf/src/perf.rs b/cmd/zfs_object_agent/object_perf/src/perf.rs index 5afeda50ecd7..641651a6f0f8 100644 --- a/cmd/zfs_object_agent/object_perf/src/perf.rs +++ b/cmd/zfs_object_agent/object_perf/src/perf.rs @@ -173,6 +173,33 @@ pub async fn write_test( Ok(()) } +pub async fn delete_test( + object_access: Arc, + key_prefix: String, + objsize: u64, + qdepth: u64, + objcount: u64, +) -> Result<(), Box> { + let perf = Perf::default(); + let bounds = WriteTestBounds::Objects(objcount); + + perf.write_objects( + object_access.clone(), + key_prefix.clone(), + objsize, + qdepth, + bounds, + ) + .await; + + let start = Instant::now(); + object_access + .delete_objects(object_access.list_objects(key_prefix, false)) + .await; + println!("Deleted {objcount} objects in {:?}", start.elapsed()); + Ok(()) +} + pub async fn read_test( object_access: Arc, key_prefix: String, diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs index 14015757efd9..62a7460f131d 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs @@ -48,7 +48,7 @@ use super::BlobCredentials; use super::BucketAccessTrait; use super::ObjectAccessProtocol; use super::RequestError; -use super::OBJECT_DELETION_BATCH_SIZE; +use super::OBJECT_DELETION_PARALLELISM; use crate::access_stats::ObjectAccessOpType; use crate::access_stats::ObjectAccessStats; use crate::object_access::ByteStream; @@ -352,7 +352,7 @@ impl ObjectAccessTrait for BlobObjectAccess { } op.end(0); }) - .buffer_unordered(*OBJECT_DELETION_BATCH_SIZE) + .buffer_unordered(*OBJECT_DELETION_PARALLELISM) .count() .await; } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs index 84832510a736..e4c02bdfadaa 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs @@ -68,7 +68,8 @@ tunable! { // see that we get good performance with a batch size of 500. Since we open // these many socket connections, we avoid a larger batch size to not run // out of file desciptors when the ulimit is low. - pub static ref OBJECT_DELETION_BATCH_SIZE: usize = 500; + pub static ref OBJECT_DELETION_PARALLELISM: usize = 500; + pub static ref OBJECT_DELETION_BATCH_SIZE: usize = 5; static ref PER_REQUEST_TIMEOUT: Duration = Duration::from_secs(2); } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs index 53241475ff0e..a91366a03676 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs @@ -1,3 +1,4 @@ +use std::cmp::max; use std::error::Error; use std::future::Future; use std::ops::Range; @@ -21,7 +22,9 @@ use aws_http::retry::AwsResponseRetryClassifier; use aws_sdk_s3::config::Region; use aws_sdk_s3::error::ProvideErrorMetadata; use aws_sdk_s3::error::SdkError; +use aws_sdk_s3::types::Delete; use aws_sdk_s3::types::ExpirationStatus; +use aws_sdk_s3::types::ObjectIdentifier; use aws_sdk_s3::Client; use aws_smithy_async::rt::sleep::TokioSleep; use aws_smithy_http::operation::Response; @@ -51,10 +54,11 @@ use super::RequestError; use super::S3Credentials; use super::LONG_OPERATION_DURATION; use super::OBJECT_DELETION_BATCH_SIZE; -use super::PANIC_OPERATION_DURATION; +use super::OBJECT_DELETION_PARALLELISM; use super::PER_REQUEST_TIMEOUT; use crate::access_stats::ObjectAccessOpType; use crate::access_stats::ObjectAccessStats; +use crate::object_access::PANIC_OPERATION_DURATION; impl From> for RequestError where @@ -408,19 +412,63 @@ impl ObjectAccessTrait for S3ObjectAccess { access_stats: &ObjectAccessStats, stream: &mut (dyn Stream + Send + Unpin), ) { + let batch_size = if self.endpoint.ends_with(".googleapis.com") { + 1 + } else { + *OBJECT_DELETION_BATCH_SIZE + }; // GCP does not support S3's batched multi-delete API. So instead we issue // object deletion requests in parallel. This performs better than the multi-delete API. stream - .map(|key| async move { - let op = access_stats.begin(ObjectAccessOpType::ObjectDelete); - self.retry(&format!("delete object {key}"), |client| { - client.delete_object().bucket(&self.bucket).key(&key).send() - }) - .await - .unwrap(); - op.end(0); + .chunks(batch_size) + .map(|chunk| async move { + match chunk.len() { + 1 => { + let key = &chunk[0]; + let op = access_stats.begin(ObjectAccessOpType::ObjectDelete); + self.retry(&format!("delete object {key}"), |client| { + client.delete_object().bucket(&self.bucket).key(key).send() + }) + .await + .unwrap(); + op.end(0); + } + len => { + let msg = format!("delete {} objects including {}", len, &chunk[0]); + let op = access_stats.begin(ObjectAccessOpType::ObjectDelete); + + if let Some(errors) = self + .retry(&msg, |client| { + client + .delete_objects() + .bucket(&self.bucket) + .delete( + chunk + .iter() + .fold(Delete::builder().quiet(true), |builder, key| { + builder.objects( + ObjectIdentifier::builder().key(key).build(), + ) + }) + .build(), + ) + .send() + }) + .await + .unwrap() + .errors() + { + errors.iter().for_each(|err| { + info!("delete error: {:?}", err); + }); + errors.get(0).unwrap(); + } + + op.end_multiple(0, chunk.len() as u64); + } + } }) - .buffer_unordered(*OBJECT_DELETION_BATCH_SIZE) + .buffer_unordered(max(1, *OBJECT_DELETION_PARALLELISM / batch_size)) .count() .await; } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_deleter.rs b/cmd/zfs_object_agent/zettaobject/src/object_deleter.rs index 78ea7dc0ff26..fdacd57cf413 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_deleter.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_deleter.rs @@ -15,7 +15,7 @@ use zettacache::base_types::PoolGuid; use crate::base_types::VersionedObjectId; use crate::data_object::DataObject; use crate::object_access::ObjectAccess; -use crate::object_access::OBJECT_DELETION_BATCH_SIZE; +use crate::object_access::OBJECT_DELETION_PARALLELISM; pub struct ObjectDeleter { // objects to delete at the end of this txg @@ -47,8 +47,8 @@ impl ObjectDeleter { while let Some(objects) = rx.recv().await { let begin = Instant::now(); let len = objects.len(); - // Do the chunking here so that we can transmit progress after each DeleteObjects call. - for chunk in objects.chunks(*OBJECT_DELETION_BATCH_SIZE) { + // Do the chunking here so that we can transmit progress after each chunk of deletes. + for chunk in objects.chunks(*OBJECT_DELETION_PARALLELISM) { object_access .delete_objects(stream::iter( chunk.iter().map(|&vo| DataObject::key(guid, vo)), diff --git a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs index 1b22a3410fd4..b2a9b7c0ec4a 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs @@ -25,7 +25,7 @@ use zettacache::base_types::*; use crate::object_access::ObjectAccess; use crate::object_access::ObjectAccessProtocol; -use crate::object_access::OBJECT_DELETION_BATCH_SIZE; +use crate::object_access::OBJECT_DELETION_PARALLELISM; use crate::pool::PoolPhys; lazy_static! { @@ -303,7 +303,7 @@ fn delete_pool_objects( measure!("delete_pool_objects()").spawn(async move { let prefix = format!("zfs/{guid}/"); let super_object = PoolPhys::key(guid); - let batch_size = *OBJECT_DELETION_BATCH_SIZE * *DESTROY_PROGRESS_FREQUENCY; + let batch_size = *OBJECT_DELETION_PARALLELISM * *DESTROY_PROGRESS_FREQUENCY; let mut count = 0; object_access diff --git a/tests/zfs-tests/tests/functional/cli_root/zpool_destroy/zpool_resume_destroy_on_import_object_store.ksh b/tests/zfs-tests/tests/functional/cli_root/zpool_destroy/zpool_resume_destroy_on_import_object_store.ksh index a1047ab0b7fa..0de200b9e0bd 100755 --- a/tests/zfs-tests/tests/functional/cli_root/zpool_destroy/zpool_resume_destroy_on_import_object_store.ksh +++ b/tests/zfs-tests/tests/functional/cli_root/zpool_destroy/zpool_resume_destroy_on_import_object_store.ksh @@ -34,14 +34,14 @@ # # STRATEGY: # 1. Create an object store based pool -# 2. Set object_deletion_batch_size to 10 (default is 1000), so that pool +# 2. Set object_deletion_parallelism to 10 (default is 1000), so that pool # deletion in object store proceeds slowly and we are able to test resume # operation # 3. Destroy the pool # 4. Verify that the pool is in DESTROYING state # 5. Stop the agent # 6. Delete the destroy pool cache file -# 7. Set object_deletion_batch_size back to 1000, so that the deletion +# 7. Set object_deletion_parallelism back to 1000, so that the deletion # progresses fast # 8. Start the agent # 9. The destroy operation won't resume automatically because there is no @@ -79,10 +79,10 @@ sudo dd if=/dev/zero of=/$TESTPOOL/foo bs=1M count=1024 typeset exp_allocated=$((1 << 30)) verify_active_object_store_pool $TESTPOOL $guid $exp_allocated -# Set object_deletion_batch_size to 10 (default is 1000), so that pool +# Set object_deletion_parallelism to 10 (default is 1000), so that pool # deletion in object store proceeds slowly and we are able to test resume # operation. -add_or_update_tunable object_deletion_batch_size 10 +add_or_update_tunable object_deletion_parallelism 10 # Stop and start the zfs object agent to be able to pick up new # object deletion batch size tunable. @@ -109,9 +109,9 @@ stop_zfs_object_agent # automatically log_must rm /etc/zfs/zpool_destroy.cache -# Set object_deletion_batch_size back to 1000, so that the deletion progresses +# Set object_deletion_parallelism back to 1000, so that the deletion progresses # fast -add_or_update_tunable object_deletion_batch_size 1000 +add_or_update_tunable object_deletion_parallelism 1000 # Start the agent start_zfs_object_agent