Skip to content

Commit

Permalink
Read test and unique paths for test output (openzfs#436)
Browse files Browse the repository at this point in the history
* New read tests
* Objects now on unique per invocation path
* Generic read/write functions that optionally record performance
* Destroy created objects at end of test
  • Loading branch information
jwk404 authored Sep 15, 2021
1 parent 7638a84 commit 78124ce
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 46 deletions.
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/zfs_object_agent/object_perf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ log = "0.4"
metered = "0.8"
futures = "0.3.16"
serde = { version = "1.0.125", features = ["derive"] }
uuid = {version = "0.8"}
tokio = { version = "1.4", features = ["full"] }
zettaobject = { path = "../zettaobject" }
18 changes: 13 additions & 5 deletions cmd/zfs_object_agent/object_perf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use clap::AppSettings;
use clap::Arg;
use clap::SubCommand;
use std::time::Duration;
use uuid::Uuid;
use zettaobject::ObjectAccess;
mod s3perf;

// XXX this and the arg parsing copied from client/src/main.rs should be made common
const ENDPOINT: &str = "https://s3-us-west-2.amazonaws.com";
const REGION: &str = "us-west-2";
const BUCKET_NAME: &str = "cloudburst-data-2";
Expand Down Expand Up @@ -88,6 +88,7 @@ async fn main() {
.default_value("/var/tmp/perflog"),
)
.subcommand(SubCommand::with_name("write").about("write test"))
.subcommand(SubCommand::with_name("read").about("read test"))
.get_matches();

zettaobject::init::setup_logging(
Expand All @@ -100,13 +101,13 @@ async fn main() {
let region_str = matches.value_of("region").unwrap();
let bucket_name = matches.value_of("bucket").unwrap();
let profile = matches.value_of("profile").unwrap();
let objsize_bytes: i32 = matches
let objsize_bytes: u64 = matches
.value_of("object-size")
.unwrap()
.parse::<i32>()
.parse::<u64>()
.unwrap()
* 1024;
let qdepth: i32 = matches.value_of("qdepth").unwrap().parse().unwrap();
let qdepth: u64 = matches.value_of("qdepth").unwrap().parse().unwrap();
let duration = Duration::from_secs(matches.value_of("runtime").unwrap().parse().unwrap());

println!(
Expand All @@ -122,9 +123,16 @@ async fn main() {
false,
);

let key_prefix = format!("zfs_object_perf/{}/", Uuid::new_v4());
println!("Using prefix: '{}'", key_prefix);
match matches.subcommand() {
("write", Some(_matches)) => {
s3perf::write_test(&object_access, objsize_bytes, qdepth, duration)
s3perf::write_test(&object_access, &key_prefix, objsize_bytes, qdepth, duration)
.await
.unwrap();
}
("read", Some(_matches)) => {
s3perf::read_test(&object_access, &key_prefix, objsize_bytes, qdepth, duration)
.await
.unwrap();
}
Expand Down
201 changes: 160 additions & 41 deletions cmd/zfs_object_agent/object_perf/src/s3perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,17 @@ use metered::common::*;
use metered::hdr_histogram::AtomicHdrHistogram;
use metered::metered;
use metered::time_source::StdInstantMicros;
use std::cmp::max;
use std::convert::TryInto;
use std::error::Error;
use std::string::String;
use std::sync::Arc;
use std::time::{Duration, Instant};
use zettaobject::ObjectAccess;

pub async fn write_test(
object_access: &ObjectAccess,
objsize: i32,
qdepth: i32,
duration: Duration,
) -> Result<(), Box<dyn Error>> {
let perf = Perf::default();
let my_perf = perf.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
info!("metrics: {:#?}", my_perf.metrics);
}
});

let mut key_id: i32 = 0;
let start = Instant::now();
let data = vec![0; objsize.try_into().unwrap()];
stream::repeat_with(|| {
let my_object_access = object_access.clone();
let my_data = data.clone();
let my_perf = perf.clone();
key_id += 1;
tokio::spawn(async move {
my_perf
.put(
&my_object_access,
&format!("perftest/key{}", key_id),
my_data,
)
.await;
})
})
.take_while(|_| future::ready(start.elapsed() < duration))
.buffer_unordered(qdepth.try_into().unwrap())
.for_each(|_| future::ready(()))
.await;

println!("metrics: {:#?}", perf.metrics);
Ok(())
enum WriteTestBounds {
Time(Duration),
Objects(u64),
}

#[derive(Default, Clone)]
Expand All @@ -68,4 +32,159 @@ impl Perf {
async fn put(&self, object_access: &ObjectAccess, key: &str, data: Vec<u8>) {
object_access.put_object(&key.to_string(), data).await;
}

#[measure(type = ResponseTime<AtomicHdrHistogram, StdInstantMicros>)]
#[measure(InFlight)]
#[measure(Throughput)]
#[measure(HitCount)]
async fn get(&self, object_access: &ObjectAccess, key: &str) {
object_access.get_object(&key.to_string()).await.unwrap();
}

fn log_metrics(&self, duration: Duration) {
let my_perf = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(duration);
loop {
interval.tick().await;
info!("{:#?}", my_perf.metrics);
}
});
}

async fn read_objects(
&self,
object_access: &ObjectAccess,
key_prefix: String,
qdepth: u64,
duration: Duration,
) {
let num_objects = object_access.collect_objects(&key_prefix, None).await.len();
let mut key_id = 0;
let start = Instant::now();
stream::repeat_with(|| {
let my_perf = self.clone();
let my_object_access = object_access.clone();
let my_key_prefix = key_prefix.clone();
key_id += 1;
tokio::spawn(async move {
my_perf
.get(
&my_object_access,
&format!("{}{}", my_key_prefix, key_id % num_objects + 1),
)
.await;
})
})
.take_while(|_| future::ready(start.elapsed() < duration))
.buffer_unordered(qdepth.try_into().unwrap())
.for_each(|_| future::ready(()))
.await;
}

async fn write_objects(
&self,
object_access: &ObjectAccess,
key_prefix: String,
objsize: u64,
qdepth: u64,
bounds: WriteTestBounds,
) {
let data = vec![0; objsize.try_into().unwrap()];
let mut key_id: u64 = 0;
let start = Instant::now();
let put_lambda = || {
let my_data = data.clone();
let my_perf = self.clone();
let my_object_access = object_access.clone();
let my_key_prefix = key_prefix.clone();
key_id += 1;
tokio::spawn(async move {
my_perf
.put(
&my_object_access,
&format!("{}{}", my_key_prefix, key_id),
my_data,
)
.await
})
};
let put_stream = stream::repeat_with(put_lambda);

match bounds {
WriteTestBounds::Time(duration) => {
put_stream
.take_while(|_| future::ready(start.elapsed() < duration))
.buffer_unordered(qdepth.try_into().unwrap())
.for_each(|_| future::ready(()))
.await;
}
WriteTestBounds::Objects(num_objects) => {
put_stream
.take(num_objects.try_into().unwrap())
.buffer_unordered(qdepth.try_into().unwrap())
.for_each(|_| future::ready(()))
.await;
}
}
}
}

pub async fn write_test(
object_access: &ObjectAccess,
key_prefix: &str,
objsize: u64,
qdepth: u64,
duration: Duration,
) -> Result<(), Box<dyn Error>> {
let perf = Perf::default();
let bounds = WriteTestBounds::Time(duration);
perf.log_metrics(Duration::from_secs(1));

perf.write_objects(
object_access,
key_prefix.to_string(),
objsize,
qdepth,
bounds,
)
.await;

println!("{:#?}", perf.metrics.put);

let object_keys = object_access.collect_all_objects(key_prefix).await;
object_access.delete_objects(&object_keys).await;

Ok(())
}

pub async fn read_test(
object_access: &ObjectAccess,
key_prefix: &str,
objsize: u64,
qdepth: u64,
duration: Duration,
) -> Result<(), Box<dyn Error>> {
let perf = Perf::default();
let bounds = WriteTestBounds::Objects(max(qdepth * 10, 200));
perf.log_metrics(Duration::from_secs(1));

perf.write_objects(
object_access,
key_prefix.to_string(),
objsize,
qdepth,
bounds,
)
.await;

perf.read_objects(object_access, key_prefix.to_string(), qdepth, duration)
.await;

println!("{:#?}", perf.metrics.get);

let object_keys = object_access.collect_all_objects(key_prefix).await;
object_access.delete_objects(&object_keys).await;

Ok(())
}

0 comments on commit 78124ce

Please sign in to comment.