Skip to content

Commit

Permalink
DLPX-84629 DeleteObjects storm causes rate limit errors (openzfs#1068)
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dagnelie <[email protected]>
  • Loading branch information
pcd1193182 authored Aug 8, 2023
1 parent 92b62c8 commit 9cc0aee
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 143 deletions.
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/zfs_object_agent/object_perf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
clap = { version = "4.1.6", features = ["derive", "wrap_help"] }
git-version = "0.3.5"
log = "0.4"
Expand Down
215 changes: 96 additions & 119 deletions cmd/zfs_object_agent/object_perf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![allow(clippy::print_stdout)]

use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use clap::ArgAction;
Expand All @@ -28,7 +29,7 @@ static GIT_VERSION: &str = git_version!(
}
);

#[derive(Args)]
#[derive(Args, Clone)]
struct S3Args {
/// S3 endpoint
#[arg(short = 'e', long, default_value = ENDPOINT)]
Expand All @@ -45,9 +46,12 @@ struct S3Args {
/// credentials profile
#[arg(short = 'p', long)]
profile: Option<String>,

#[arg(long)]
instance_profile: bool,
}

#[derive(Args)]
#[derive(Args, Clone)]
struct BlobArgs {
/// Blob endpoint
#[arg(short = 'e', long)]
Expand All @@ -62,7 +66,7 @@ struct BlobArgs {
profile: Option<String>,
}

#[derive(Args)]
#[derive(Args, Clone)]
struct OciArgs {
/// OCI bucket
#[arg(short = 'b', long, default_value = BUCKET_NAME)]
Expand Down Expand Up @@ -119,6 +123,14 @@ enum Commands {
s3_args: S3Args,
},

/// Delete s3 test
DeleteS3 {
#[command(flatten)]
s3_args: S3Args,
#[arg(short = 'c', long, default_value = "2048")]
objcount: u64,
},

/// Write blob test
WriteBlob {
#[command(flatten)]
Expand All @@ -131,6 +143,14 @@ enum Commands {
blob_args: BlobArgs,
},

/// Delete s3 test
DeleteBlob {
#[command(flatten)]
blob_args: BlobArgs,
#[arg(short = 'c', long, default_value = "2048")]
objcount: u64,
},

/// Write OCI test
WriteOci {
#[command(flatten)]
Expand All @@ -142,6 +162,60 @@ enum Commands {
#[command(flatten)]
oci_args: OciArgs,
},

/// Delete s3 test
DeleteOci {
#[command(flatten)]
oci_args: OciArgs,
#[arg(short = 'c', long, default_value = "2048")]
objcount: u64,
},
}

async fn s3_object_access(s3_args: S3Args) -> Arc<ObjectAccess> {
ObjectAccess::new(
ObjectAccessProtocol::S3 {
endpoint: s3_args.endpoint,
region: s3_args.region,
credentials: match (s3_args.instance_profile, s3_args.profile) {
(true, _) => S3Credentials::InstanceProfile,
(false, Some(profile)) => S3Credentials::Profile(profile),
(false, None) => S3Credentials::Automatic,
},
},
s3_args.bucket,
false,
)
.await
.unwrap()
}

async fn blob_object_access(blob_args: BlobArgs) -> Arc<ObjectAccess> {
ObjectAccess::new(
ObjectAccessProtocol::Blob {
endpoint: blob_args.endpoint,
credentials: match blob_args.profile {
Some(profile) => BlobCredentials::Profile(profile),
None => BlobCredentials::Automatic,
},
},
blob_args.bucket,
false,
)
.await
.unwrap()
}

async fn oci_object_access(oci_args: OciArgs) -> Arc<ObjectAccess> {
ObjectAccess::new(
ObjectAccessProtocol::Oci {
credentials: zettaobject::object_access::OciCredentials::InstancePrincipal,
},
oci_args.bucket,
false,
)
.await
.unwrap()
}

#[tokio::main]
Expand All @@ -162,74 +236,20 @@ async fn main() {

let key_prefix = format!("zfs_object_perf/{}/", Uuid::new_v4());
println!("Using prefix: '{key_prefix}'");
match cli.command {
Commands::WriteS3 { s3_args } => {
let object_access = ObjectAccess::new(
ObjectAccessProtocol::S3 {
endpoint: s3_args.endpoint,
region: s3_args.region,
credentials: match s3_args.profile {
Some(profile) => S3Credentials::Profile(profile),
None => S3Credentials::Automatic,
},
},
s3_args.bucket,
false,
)
.await
.unwrap();

perf::write_test(
object_access,
key_prefix,
objsize_bytes,
cli.qdepth,
duration,
)
.await
.unwrap();
}
Commands::ReadS3 { s3_args } => {
let object_access = ObjectAccess::new(
ObjectAccessProtocol::S3 {
endpoint: s3_args.endpoint,
region: s3_args.region,
credentials: match s3_args.profile {
Some(profile) => S3Credentials::Profile(profile),
None => S3Credentials::Automatic,
},
},
s3_args.bucket,
false,
)
.await
.unwrap();

perf::read_test(
object_access,
key_prefix,
objsize_bytes,
cli.qdepth,
duration,
)
.await
.unwrap();
}
Commands::WriteBlob { blob_args } => {
let object_access = ObjectAccess::new(
ObjectAccessProtocol::Blob {
endpoint: blob_args.endpoint,
credentials: match blob_args.profile {
Some(profile) => BlobCredentials::Profile(profile),
None => BlobCredentials::Automatic,
},
},
blob_args.bucket,
false,
)
.await
.unwrap();

let object_access = match &cli.command {
Commands::WriteS3 { s3_args }
| Commands::ReadS3 { s3_args }
| Commands::DeleteS3 { s3_args, .. } => s3_object_access(s3_args.clone()).await,
Commands::WriteBlob { blob_args }
| Commands::ReadBlob { blob_args }
| Commands::DeleteBlob { blob_args, .. } => blob_object_access(blob_args.clone()).await,
Commands::WriteOci { oci_args }
| Commands::ReadOci { oci_args }
| Commands::DeleteOci { oci_args, .. } => oci_object_access(oci_args.clone()).await,
};
match cli.command {
Commands::WriteS3 { .. } | Commands::WriteBlob { .. } | Commands::WriteOci { .. } => {
perf::write_test(
object_access,
key_prefix,
Expand All @@ -240,21 +260,7 @@ async fn main() {
.await
.unwrap();
}
Commands::ReadBlob { blob_args } => {
let object_access = ObjectAccess::new(
ObjectAccessProtocol::Blob {
endpoint: blob_args.endpoint,
credentials: match blob_args.profile {
Some(profile) => BlobCredentials::Profile(profile),
None => BlobCredentials::Automatic,
},
},
blob_args.bucket,
false,
)
.await
.unwrap();

Commands::ReadS3 { .. } | Commands::ReadBlob { .. } | Commands::ReadOci { .. } => {
perf::read_test(
object_access,
key_prefix,
Expand All @@ -265,44 +271,15 @@ async fn main() {
.await
.unwrap();
}
Commands::WriteOci { oci_args } => {
let object_access = ObjectAccess::new(
ObjectAccessProtocol::Oci {
credentials: zettaobject::object_access::OciCredentials::InstancePrincipal,
},
oci_args.bucket,
false,
)
.await
.unwrap();

perf::write_test(
object_access,
key_prefix,
objsize_bytes,
cli.qdepth,
duration,
)
.await
.unwrap();
}
Commands::ReadOci { oci_args } => {
let object_access = ObjectAccess::new(
ObjectAccessProtocol::Oci {
credentials: zettaobject::object_access::OciCredentials::InstancePrincipal,
},
oci_args.bucket,
false,
)
.await
.unwrap();

perf::read_test(
Commands::DeleteS3 { objcount, .. }
| Commands::DeleteBlob { objcount, .. }
| Commands::DeleteOci { objcount, .. } => {
perf::delete_test(
object_access,
key_prefix,
objsize_bytes,
cli.qdepth,
duration,
objcount,
)
.await
.unwrap();
Expand Down
27 changes: 27 additions & 0 deletions cmd/zfs_object_agent/object_perf/src/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,33 @@ pub async fn write_test(
Ok(())
}

pub async fn delete_test(
object_access: Arc<ObjectAccess>,
key_prefix: String,
objsize: u64,
qdepth: u64,
objcount: u64,
) -> Result<(), Box<dyn Error>> {
let perf = Perf::default();
let bounds = WriteTestBounds::Objects(objcount);

perf.write_objects(
object_access.clone(),
key_prefix.clone(),
objsize,
qdepth,
bounds,
)
.await;

let start = Instant::now();
object_access
.delete_objects(object_access.list_objects(key_prefix, false))
.await;
println!("Deleted {objcount} objects in {:?}", start.elapsed());
Ok(())
}

pub async fn read_test(
object_access: Arc<ObjectAccess>,
key_prefix: String,
Expand Down
4 changes: 2 additions & 2 deletions cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use super::BlobCredentials;
use super::BucketAccessTrait;
use super::ObjectAccessProtocol;
use super::RequestError;
use super::OBJECT_DELETION_BATCH_SIZE;
use super::OBJECT_DELETION_PARALLELISM;
use crate::access_stats::ObjectAccessOpType;
use crate::access_stats::ObjectAccessStats;
use crate::object_access::ByteStream;
Expand Down Expand Up @@ -352,7 +352,7 @@ impl ObjectAccessTrait for BlobObjectAccess {
}
op.end(0);
})
.buffer_unordered(*OBJECT_DELETION_BATCH_SIZE)
.buffer_unordered(*OBJECT_DELETION_PARALLELISM)
.count()
.await;
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ tunable! {
// see that we get good performance with a batch size of 500. Since we open
// these many socket connections, we avoid a larger batch size to not run
// out of file desciptors when the ulimit is low.
pub static ref OBJECT_DELETION_BATCH_SIZE: usize = 500;
pub static ref OBJECT_DELETION_PARALLELISM: usize = 500;
pub static ref OBJECT_DELETION_BATCH_SIZE: usize = 5;
static ref PER_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
}

Expand Down
Loading

0 comments on commit 9cc0aee

Please sign in to comment.