Skip to content

Commit

Permalink
Avoid memory leaks
Browse files Browse the repository at this point in the history
This commit replaces the `threadpool` crate with a handcrafted
solution based on scoped threads. This leaves `valgrind` much
happier than before. We also lose some dependency baggage.

A few tests are added to verify that the requested number of
threads actually spawn and start pulling tasks.
  • Loading branch information
Felix-El committed Aug 18, 2024
1 parent af6e427 commit 81db7a9
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 22 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Replace dependency on threadpool crate with a custom solution built on the
standard library only, and only using scoped threads
-> fixes memory leaks observed when running under valgrind
- up MSRV to 1.63 for scoped threads

## [0.7.3] - 2024-05-10
- Default to single-threaded tests for WebAssembly (thanks @alexcrichton) in [#41](https://github.com/LukasKalbertodt/libtest-mimic/pull/41)
Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libtest-mimic"
version = "0.7.3"
authors = ["Lukas Kalbertodt <[email protected]>"]
edition = "2021"
rust-version = "1.60"
rust-version = "1.63"

description = """
Write your own test harness that looks and behaves like the built-in test \
Expand All @@ -20,7 +20,6 @@ exclude = [".github"]

[dependencies]
clap = { version = "4.0.8", features = ["derive"] }
threadpool = "1.8.1"
termcolor = "1.0.5"
escape8259 = "0.5.2"

Expand Down
52 changes: 32 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,22 @@

#![forbid(unsafe_code)]

use std::{borrow::Cow, fmt, process::{self, ExitCode}, sync::mpsc, time::Instant};
use std::{
borrow::Cow,
fmt,
process::{self, ExitCode},
sync::mpsc,
time::Instant,
};

mod args;
mod pool;
mod printer;

use printer::Printer;
use threadpool::ThreadPool;

pub use crate::args::{Arguments, ColorSetting, FormatSetting};



/// A single test or benchmark.
///
/// The original `libtest` often calls benchmarks "tests", which is a bit
Expand Down Expand Up @@ -143,8 +147,9 @@ impl Trial {
Err(failed) => Outcome::Failed(failed),
Ok(_) if test_mode => Outcome::Passed,
Ok(Some(measurement)) => Outcome::Measured(measurement),
Ok(None)
=> Outcome::Failed("bench runner returned `Ok(None)` in bench mode".into()),
Ok(None) => {
Outcome::Failed("bench runner returned `Ok(None)` in bench mode".into())
}
}),
info: TestInfo {
name: name.into(),
Expand Down Expand Up @@ -284,13 +289,11 @@ impl Failed {
impl<M: std::fmt::Display> From<M> for Failed {
fn from(msg: M) -> Self {
Self {
msg: Some(msg.to_string())
msg: Some(msg.to_string()),
}
}
}



/// The outcome of performing a test/benchmark.
#[derive(Debug, Clone)]
enum Outcome {
Expand Down Expand Up @@ -473,15 +476,22 @@ pub fn run(args: &Arguments, mut tests: Vec<Trial>) -> Conclusion {
Outcome::Failed(failed) => {
failed_tests.push((test, failed.msg));
conclusion.num_failed += 1;
},
}
Outcome::Ignored => conclusion.num_ignored += 1,
Outcome::Measured(_) => conclusion.num_measured += 1,
}
};

// Execute all tests.
let test_mode = !args.bench;
if platform_defaults_to_one_thread() || args.test_threads == Some(1) {

let num_threads = platform_defaults_to_one_thread()
.then_some(1)
.or(args.test_threads)
.or_else(|| std::thread::available_parallelism().ok().map(Into::into))
.unwrap_or(1);

if num_threads == 1 {
// Run test sequentially in main thread
for test in tests {
// Print `test foo ...`, run the test, then print the outcome in
Expand All @@ -496,28 +506,29 @@ pub fn run(args: &Arguments, mut tests: Vec<Trial>) -> Conclusion {
}
} else {
// Run test in thread pool.
let pool = match args.test_threads {
Some(num_threads) => ThreadPool::new(num_threads),
None => ThreadPool::default()
};
let num_tests = tests.len();
let (sender, receiver) = mpsc::channel();

let num_tests = tests.len();
for test in tests {
let mut tasks: Vec<pool::BoxedTask> = Default::default();

for test in tests.into_iter() {
if args.is_ignored(&test) {
sender.send((Outcome::Ignored, test.info)).unwrap();
} else {
let sender = sender.clone();
pool.execute(move || {

tasks.push(Box::new(move || {
// It's fine to ignore the result of sending. If the
// receiver has hung up, everything will wind down soon
// anyway.
let outcome = run_single(test.runner, test_mode);
let _ = sender.send((outcome, test.info));
});
}));
}
}

pool::run_on_scoped_pool(tasks, num_threads);

for (outcome, test_info) in receiver.iter().take(num_tests) {
// In multithreaded mode, we do only print the start of the line
// after the test ran, as otherwise it would lead to terribly
Expand Down Expand Up @@ -552,7 +563,8 @@ fn run_single(runner: Box<dyn FnOnce(bool) -> Outcome + Send>, test_mode: bool)
// The `panic` information is just an `Any` object representing the
// value the panic was invoked with. For most panics (which use
// `panic!` like `println!`), this is either `&str` or `String`.
let payload = e.downcast_ref::<String>()
let payload = e
.downcast_ref::<String>()
.map(|s| s.as_str())
.or(e.downcast_ref::<&str>().map(|s| *s));

Expand Down
88 changes: 88 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::{sync, thread};

pub(crate) type Task = dyn FnOnce() + Send;
pub(crate) type BoxedTask = Box<Task>;

pub(crate) fn run_on_scoped_pool(
tasks: impl IntoIterator<Item = BoxedTask, IntoIter: Send>,
num_threads: usize,
) {
if num_threads < 2 {
panic!("run_on_scoped_pool should not be called with num_threads less than 2");
}

let sync_iter = sync::Mutex::new(tasks.into_iter());
let next_task = || sync_iter.lock().unwrap().next();

thread::scope(|scope| {
for _ in 0..num_threads {
scope.spawn(|| {
while let Some(task) = next_task() {
task();
}
});
}
});
}

#[cfg(test)]
mod tests {
use std::{
collections::HashSet,
sync::{Arc, Mutex},
thread::ThreadId,
};

use super::{run_on_scoped_pool, BoxedTask};

#[derive(Default, Clone)]
struct TaskGenerator {
threads: Arc<Mutex<HashSet<ThreadId>>>,
}
impl TaskGenerator {
fn num_unique_threads(&self) -> usize {
self.threads.lock().unwrap().len()
}
}
impl Iterator for TaskGenerator {
type Item = BoxedTask;
fn next(&mut self) -> Option<Self::Item> {
self.threads
.lock()
.unwrap()
.insert(std::thread::current().id());

// Tell the calling thread that there are not tasks left for it
// and it may exit now.
None
}
}

#[test]
#[should_panic]
fn check_num_threads_0() {
let tasks: TaskGenerator = Default::default();
run_on_scoped_pool(tasks.clone(), 0);
}

#[test]
#[should_panic]
fn check_num_threads_1() {
let tasks: TaskGenerator = Default::default();
run_on_scoped_pool(tasks.clone(), 1);
}

#[test]
fn check_num_threads_2() {
let tasks: TaskGenerator = Default::default();
run_on_scoped_pool(tasks.clone(), 2);
assert_eq!(tasks.num_unique_threads(), 2);
}

#[test]
fn check_num_threads_13() {
let tasks: TaskGenerator = Default::default();
run_on_scoped_pool(tasks.clone(), 13);
assert_eq!(tasks.num_unique_threads(), 13);
}
}

0 comments on commit 81db7a9

Please sign in to comment.