Skip to content

Commit

Permalink
address feedback and some cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Prendes <[email protected]>
  • Loading branch information
jprendes committed Nov 19, 2024
1 parent 6077c8c commit 80b360a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 66 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 38 additions & 47 deletions crates/stress-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use std::sync::Arc;

use anyhow::Result;
use clap::{Parser, ValueEnum};
use humantime::{format_duration, parse_duration};
use nix::sys::prctl::set_child_subreaper;
use tokio::sync::{Notify, Semaphore};
use tokio::task::JoinSet;
use tokio::time::Duration;
use utils::{reap_children, TryFutureEx};
use tokio::time::{Duration, Instant};
use utils::{reap_children, DropIf as _, TryFutureEx as _};

#[derive(ValueEnum, Clone, Copy)]
#[derive(ValueEnum, Clone, Copy, PartialEq)]
enum Step {
Create,
Start,
Expand Down Expand Up @@ -47,7 +48,7 @@ struct Cli {
/// Number of tasks to run
count: usize,

#[clap(short, long, value_parser = humantime::parse_duration, default_value = "2s")]
#[clap(short, long, value_parser = parse_duration, default_value = "2s")]
/// Runtime timeout [0 = no timeout]
timeout: Duration,
}
Expand Down Expand Up @@ -79,6 +80,15 @@ async fn main_impl() -> Result<()> {
let shim = c8d.start_shim(shim).await?;
let shim = Arc::new(shim);

let mut tasks = vec![];

// First create the tasks bundles, as this is not
// work done by the shim itself
for _ in 0..count {
let task = shim.task().await?;
tasks.push(task);
}

let permits = if parallel == 0 {
count as _
} else {
Expand All @@ -87,49 +97,25 @@ async fn main_impl() -> Result<()> {
let semaphore = Arc::new(Semaphore::new(permits));
let mut tracker: JoinSet<Result<()>> = JoinSet::new();

let serial_steps = match serial_steps {
Step::Create => "c",
Step::Start => "cs",
Step::Wait => "csw",
Step::Delete => "cswd",
};

for _ in 0..count {
let shim = shim.clone();
let start = Instant::now();
for task in tasks {
let semaphore = semaphore.clone();

tracker.spawn(async move {
let task = shim.task().await?;
// take a permit as this section might have to run serially
let mut permit = Some(semaphore.acquire().await?);

{
// take a permit as this saction might have to run serially
let _permit = semaphore.acquire().await?;
if serial_steps.contains('c') {
task.create(container_output).await?;
}
if serial_steps.contains('s') {
task.start().await?;
}
if serial_steps.contains('w') {
task.wait().await?;
}
if serial_steps.contains('d') {
task.delete().await?;
}
}
task.create(container_output).await?;
permit.drop_if(serial_steps == Step::Create);

if !serial_steps.contains('c') {
task.create(container_output).await?;
}
if !serial_steps.contains('s') {
task.start().await?;
}
if !serial_steps.contains('w') {
task.wait().await?;
}
if !serial_steps.contains('d') {
task.delete().await?;
}
task.start().await?;
permit.drop_if(serial_steps == Step::Start);

task.wait().await?;
permit.drop_if(serial_steps == Step::Wait);

task.delete().await?;
permit.drop_if(serial_steps == Step::Delete);

Ok(())
});
Expand Down Expand Up @@ -160,17 +146,22 @@ async fn main_impl() -> Result<()> {
}
}
}
let _ = shim.shutdown().await;
c8d.shutdown().await
Ok(())
}
.with_watchdog(timeout, ping.clone())
.or_ctrl_c()
.await;

let elapsed = start.elapsed();

let color = if success == count { 32 } else { 31 };

println!();
println!("{success} tasks succeeded");
println!("{failed} tasks failed");
println!("{} tasks hanged", count - success - failed);
println!(
"\x1b[{color}m{success} tasks succeeded, {failed} tasks failed, {} tasks didn't finish\x1b[0m",
count - success - failed
);
println!("elapsed time: {}", format_duration(elapsed));

Ok(())
}
11 changes: 3 additions & 8 deletions crates/stress-test/src/mocks/containerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Events for EventsService {

pub struct Containerd {
dir: TempDir,
server: ServerHandle,
_server: ServerHandle,
verbose: bool,
}

Expand All @@ -27,24 +27,19 @@ impl Containerd {
let dir = tempdir()?;
let socket = dir.path().join("containerd.sock.ttrpc");

let server = Server::new()
let _server = Server::new()
.register(service!(EventsService: Events))
.bind(format!("unix://{}", socket.display()))
.await?;

Ok(Self {
dir,
server,
_server,
verbose,
})
}

pub async fn start_shim(&self, shim: impl AsRef<Path>) -> Result<Shim> {
Shim::new(&self.dir, self.verbose, shim).await
}

pub async fn shutdown(self) -> Result<()> {
self.server.terminate();
Ok(self.server.await?)
}
}
8 changes: 0 additions & 8 deletions crates/stress-test/src/mocks/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tokio::process::Command;
use trapeze::Client;

use super::Task;
use crate::protos::containerd::task::v2::Task as _;
use crate::utils::hash;

pub struct Shim {
Expand Down Expand Up @@ -70,11 +69,4 @@ impl Shim {
pub async fn task(&self) -> Result<Task> {
Task::new(&self.dir, self.client.clone()).await
}

pub async fn shutdown(&self) -> Result<()> {
info!("Shutting down");

self.client.shutdown(Default::default()).await?;
Ok(())
}
}
20 changes: 18 additions & 2 deletions crates/stress-test/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ pub async fn reap_children() -> Result<()> {

pub trait TryFutureEx<E> {
fn or_ctrl_c(self) -> impl Future<Output = Result<E>> + Send;
fn with_watchdog(self, t: Duration, ping: Arc<Notify>) -> impl Future<Output = Result<E>> + Send;
fn with_watchdog(
self,
t: Duration,
ping: Arc<Notify>,
) -> impl Future<Output = Result<E>> + Send;
}

impl<E: Default, T: Future<Output = Result<E>> + Send> TryFutureEx<E> for T {
Expand All @@ -68,7 +72,7 @@ impl<E: Default, T: Future<Output = Result<E>> + Send> TryFutureEx<E> for T {

let fut = self;
tokio::pin!(fut);

loop {
select! {
val = &mut fut => { return val; },
Expand All @@ -78,3 +82,15 @@ impl<E: Default, T: Future<Output = Result<E>> + Send> TryFutureEx<E> for T {
}
}
}

pub trait DropIf {
fn drop_if(&mut self, cond: bool);
}

impl<T> DropIf for Option<T> {
fn drop_if(&mut self, cond: bool) {
if cond {
self.take();
}
}
}

0 comments on commit 80b360a

Please sign in to comment.