diff --git a/cmd/zfs_object_agent/Cargo.lock b/cmd/zfs_object_agent/Cargo.lock index 0bc0939cfb5c..5766650871ff 100644 --- a/cmd/zfs_object_agent/Cargo.lock +++ b/cmd/zfs_object_agent/Cargo.lock @@ -2458,6 +2458,7 @@ dependencies = [ "metered", "serde 1.0.127", "tokio", + "uuid", "zettaobject", ] diff --git a/cmd/zfs_object_agent/object_perf/Cargo.toml b/cmd/zfs_object_agent/object_perf/Cargo.toml index b6514eafe104..7659ac83b57f 100644 --- a/cmd/zfs_object_agent/object_perf/Cargo.toml +++ b/cmd/zfs_object_agent/object_perf/Cargo.toml @@ -12,5 +12,6 @@ log = "0.4" metered = "0.8" futures = "0.3.16" serde = { version = "1.0.125", features = ["derive"] } +uuid = {version = "0.8"} tokio = { version = "1.4", features = ["full"] } zettaobject = { path = "../zettaobject" } \ No newline at end of file diff --git a/cmd/zfs_object_agent/object_perf/src/main.rs b/cmd/zfs_object_agent/object_perf/src/main.rs index 14601219812d..3d1e8290ef76 100644 --- a/cmd/zfs_object_agent/object_perf/src/main.rs +++ b/cmd/zfs_object_agent/object_perf/src/main.rs @@ -2,10 +2,10 @@ use clap::AppSettings; use clap::Arg; use clap::SubCommand; use std::time::Duration; +use uuid::Uuid; use zettaobject::ObjectAccess; mod s3perf; -// XXX this and the arg parsing copied from client/src/main.rs should be made common const ENDPOINT: &str = "https://s3-us-west-2.amazonaws.com"; const REGION: &str = "us-west-2"; const BUCKET_NAME: &str = "cloudburst-data-2"; @@ -88,6 +88,7 @@ async fn main() { .default_value("/var/tmp/perflog"), ) .subcommand(SubCommand::with_name("write").about("write test")) + .subcommand(SubCommand::with_name("read").about("read test")) .get_matches(); zettaobject::init::setup_logging( @@ -100,13 +101,13 @@ async fn main() { let region_str = matches.value_of("region").unwrap(); let bucket_name = matches.value_of("bucket").unwrap(); let profile = matches.value_of("profile").unwrap(); - let objsize_bytes: i32 = matches + let objsize_bytes: u64 = matches .value_of("object-size") .unwrap() - .parse::() + .parse::() .unwrap() * 1024; - let qdepth: i32 = matches.value_of("qdepth").unwrap().parse().unwrap(); + let qdepth: u64 = matches.value_of("qdepth").unwrap().parse().unwrap(); let duration = Duration::from_secs(matches.value_of("runtime").unwrap().parse().unwrap()); println!( @@ -122,9 +123,16 @@ async fn main() { false, ); + let key_prefix = format!("zfs_object_perf/{}/", Uuid::new_v4()); + println!("Using prefix: '{}'", key_prefix); match matches.subcommand() { ("write", Some(_matches)) => { - s3perf::write_test(&object_access, objsize_bytes, qdepth, duration) + s3perf::write_test(&object_access, &key_prefix, objsize_bytes, qdepth, duration) + .await + .unwrap(); + } + ("read", Some(_matches)) => { + s3perf::read_test(&object_access, &key_prefix, objsize_bytes, qdepth, duration) .await .unwrap(); } diff --git a/cmd/zfs_object_agent/object_perf/src/s3perf.rs b/cmd/zfs_object_agent/object_perf/src/s3perf.rs index f28a2024176f..f29e920936ab 100644 --- a/cmd/zfs_object_agent/object_perf/src/s3perf.rs +++ b/cmd/zfs_object_agent/object_perf/src/s3perf.rs @@ -5,53 +5,17 @@ use metered::common::*; use metered::hdr_histogram::AtomicHdrHistogram; use metered::metered; use metered::time_source::StdInstantMicros; +use std::cmp::max; use std::convert::TryInto; use std::error::Error; +use std::string::String; use std::sync::Arc; use std::time::{Duration, Instant}; use zettaobject::ObjectAccess; -pub async fn write_test( - object_access: &ObjectAccess, - objsize: i32, - qdepth: i32, - duration: Duration, -) -> Result<(), Box> { - let perf = Perf::default(); - let my_perf = perf.clone(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(1)); - loop { - interval.tick().await; - info!("metrics: {:#?}", my_perf.metrics); - } - }); - - let mut key_id: i32 = 0; - let start = Instant::now(); - let data = vec![0; objsize.try_into().unwrap()]; - stream::repeat_with(|| { - let my_object_access = object_access.clone(); - let my_data = data.clone(); - let my_perf = perf.clone(); - key_id += 1; - tokio::spawn(async move { - my_perf - .put( - &my_object_access, - &format!("perftest/key{}", key_id), - my_data, - ) - .await; - }) - }) - .take_while(|_| future::ready(start.elapsed() < duration)) - .buffer_unordered(qdepth.try_into().unwrap()) - .for_each(|_| future::ready(())) - .await; - - println!("metrics: {:#?}", perf.metrics); - Ok(()) +enum WriteTestBounds { + Time(Duration), + Objects(u64), } #[derive(Default, Clone)] @@ -68,4 +32,159 @@ impl Perf { async fn put(&self, object_access: &ObjectAccess, key: &str, data: Vec) { object_access.put_object(&key.to_string(), data).await; } + + #[measure(type = ResponseTime)] + #[measure(InFlight)] + #[measure(Throughput)] + #[measure(HitCount)] + async fn get(&self, object_access: &ObjectAccess, key: &str) { + object_access.get_object(&key.to_string()).await.unwrap(); + } + + fn log_metrics(&self, duration: Duration) { + let my_perf = self.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(duration); + loop { + interval.tick().await; + info!("{:#?}", my_perf.metrics); + } + }); + } + + async fn read_objects( + &self, + object_access: &ObjectAccess, + key_prefix: String, + qdepth: u64, + duration: Duration, + ) { + let num_objects = object_access.collect_objects(&key_prefix, None).await.len(); + let mut key_id = 0; + let start = Instant::now(); + stream::repeat_with(|| { + let my_perf = self.clone(); + let my_object_access = object_access.clone(); + let my_key_prefix = key_prefix.clone(); + key_id += 1; + tokio::spawn(async move { + my_perf + .get( + &my_object_access, + &format!("{}{}", my_key_prefix, key_id % num_objects + 1), + ) + .await; + }) + }) + .take_while(|_| future::ready(start.elapsed() < duration)) + .buffer_unordered(qdepth.try_into().unwrap()) + .for_each(|_| future::ready(())) + .await; + } + + async fn write_objects( + &self, + object_access: &ObjectAccess, + key_prefix: String, + objsize: u64, + qdepth: u64, + bounds: WriteTestBounds, + ) { + let data = vec![0; objsize.try_into().unwrap()]; + let mut key_id: u64 = 0; + let start = Instant::now(); + let put_lambda = || { + let my_data = data.clone(); + let my_perf = self.clone(); + let my_object_access = object_access.clone(); + let my_key_prefix = key_prefix.clone(); + key_id += 1; + tokio::spawn(async move { + my_perf + .put( + &my_object_access, + &format!("{}{}", my_key_prefix, key_id), + my_data, + ) + .await + }) + }; + let put_stream = stream::repeat_with(put_lambda); + + match bounds { + WriteTestBounds::Time(duration) => { + put_stream + .take_while(|_| future::ready(start.elapsed() < duration)) + .buffer_unordered(qdepth.try_into().unwrap()) + .for_each(|_| future::ready(())) + .await; + } + WriteTestBounds::Objects(num_objects) => { + put_stream + .take(num_objects.try_into().unwrap()) + .buffer_unordered(qdepth.try_into().unwrap()) + .for_each(|_| future::ready(())) + .await; + } + } + } +} + +pub async fn write_test( + object_access: &ObjectAccess, + key_prefix: &str, + objsize: u64, + qdepth: u64, + duration: Duration, +) -> Result<(), Box> { + let perf = Perf::default(); + let bounds = WriteTestBounds::Time(duration); + perf.log_metrics(Duration::from_secs(1)); + + perf.write_objects( + object_access, + key_prefix.to_string(), + objsize, + qdepth, + bounds, + ) + .await; + + println!("{:#?}", perf.metrics.put); + + let object_keys = object_access.collect_all_objects(key_prefix).await; + object_access.delete_objects(&object_keys).await; + + Ok(()) +} + +pub async fn read_test( + object_access: &ObjectAccess, + key_prefix: &str, + objsize: u64, + qdepth: u64, + duration: Duration, +) -> Result<(), Box> { + let perf = Perf::default(); + let bounds = WriteTestBounds::Objects(max(qdepth * 10, 200)); + perf.log_metrics(Duration::from_secs(1)); + + perf.write_objects( + object_access, + key_prefix.to_string(), + objsize, + qdepth, + bounds, + ) + .await; + + perf.read_objects(object_access, key_prefix.to_string(), qdepth, duration) + .await; + + println!("{:#?}", perf.metrics.get); + + let object_keys = object_access.collect_all_objects(key_prefix).await; + object_access.delete_objects(&object_keys).await; + + Ok(()) }