Skip to content

Commit

Permalink
Avoid memory leaks
Browse files Browse the repository at this point in the history
This commit replaces the `threadpool` crate with a handcrafted
solution based on scoped threads. This leaves `valgrind` much
happier than before. We also lose some dependency baggage.
  • Loading branch information
Felix-El committed Aug 16, 2024
1 parent af6e427 commit bcb4721
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 21 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Replace dependency on threadpool crate with a custom solution built on the
standard library only, and only using scoped threads
-> fixes memory leaks observed when running under valgrind
- up MSRV to 1.63 for scoped threads

## [0.7.3] - 2024-05-10
- Default to single-threaded tests for WebAssembly (thanks @alexcrichton) in [#41](https://github.com/LukasKalbertodt/libtest-mimic/pull/41)
Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libtest-mimic"
version = "0.7.3"
authors = ["Lukas Kalbertodt <[email protected]>"]
edition = "2021"
rust-version = "1.60"
rust-version = "1.63"

description = """
Write your own test harness that looks and behaves like the built-in test \
Expand All @@ -20,7 +20,6 @@ exclude = [".github"]

[dependencies]
clap = { version = "4.0.8", features = ["derive"] }
threadpool = "1.8.1"
termcolor = "1.0.5"
escape8259 = "0.5.2"

Expand Down
43 changes: 24 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,22 @@

#![forbid(unsafe_code)]

use std::{borrow::Cow, fmt, process::{self, ExitCode}, sync::mpsc, time::Instant};
use std::{
borrow::Cow,
fmt,
process::{self, ExitCode},
sync::mpsc,
time::Instant,
};

mod args;
mod pool;
mod printer;

use printer::Printer;
use threadpool::ThreadPool;

pub use crate::args::{Arguments, ColorSetting, FormatSetting};



/// A single test or benchmark.
///
/// The original `libtest` often calls benchmarks "tests", which is a bit
Expand Down Expand Up @@ -143,8 +147,9 @@ impl Trial {
Err(failed) => Outcome::Failed(failed),
Ok(_) if test_mode => Outcome::Passed,
Ok(Some(measurement)) => Outcome::Measured(measurement),
Ok(None)
=> Outcome::Failed("bench runner returned `Ok(None)` in bench mode".into()),
Ok(None) => {
Outcome::Failed("bench runner returned `Ok(None)` in bench mode".into())
}
}),
info: TestInfo {
name: name.into(),
Expand Down Expand Up @@ -284,13 +289,11 @@ impl Failed {
impl<M: std::fmt::Display> From<M> for Failed {
fn from(msg: M) -> Self {
Self {
msg: Some(msg.to_string())
msg: Some(msg.to_string()),
}
}
}



/// The outcome of performing a test/benchmark.
#[derive(Debug, Clone)]
enum Outcome {
Expand Down Expand Up @@ -473,7 +476,7 @@ pub fn run(args: &Arguments, mut tests: Vec<Trial>) -> Conclusion {
Outcome::Failed(failed) => {
failed_tests.push((test, failed.msg));
conclusion.num_failed += 1;
},
}
Outcome::Ignored => conclusion.num_ignored += 1,
Outcome::Measured(_) => conclusion.num_measured += 1,
}
Expand All @@ -496,28 +499,29 @@ pub fn run(args: &Arguments, mut tests: Vec<Trial>) -> Conclusion {
}
} else {
// Run test in thread pool.
let pool = match args.test_threads {
Some(num_threads) => ThreadPool::new(num_threads),
None => ThreadPool::default()
};
let num_tests = tests.len();
let (sender, receiver) = mpsc::channel();

let num_tests = tests.len();
for test in tests {
let mut tasks: Vec<pool::BoxedTask> = Default::default();

for test in tests.into_iter() {
if args.is_ignored(&test) {
sender.send((Outcome::Ignored, test.info)).unwrap();
} else {
let sender = sender.clone();
pool.execute(move || {

tasks.push(Box::new(move || {
// It's fine to ignore the result of sending. If the
// receiver has hung up, everything will wind down soon
// anyway.
let outcome = run_single(test.runner, test_mode);
let _ = sender.send((outcome, test.info));
});
}));
}
}

pool::ScopedPool::new(tasks).run(args.test_threads);

for (outcome, test_info) in receiver.iter().take(num_tests) {
// In multithreaded mode, we do only print the start of the line
// after the test ran, as otherwise it would lead to terribly
Expand Down Expand Up @@ -552,7 +556,8 @@ fn run_single(runner: Box<dyn FnOnce(bool) -> Outcome + Send>, test_mode: bool)
// The `panic` information is just an `Any` object representing the
// value the panic was invoked with. For most panics (which use
// `panic!` like `println!`), this is either `&str` or `String`.
let payload = e.downcast_ref::<String>()
let payload = e
.downcast_ref::<String>()
.map(|s| s.as_str())
.or(e.downcast_ref::<&str>().map(|s| *s));

Expand Down
35 changes: 35 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::{sync, thread};

pub(crate) type Task = dyn FnOnce() + Send;
pub(crate) type BoxedTask = Box<Task>;

pub(crate) struct ScopedPool<I>(I);

impl<I> ScopedPool<I>
where
I: IntoIterator<Item = BoxedTask>,
I::IntoIter: Send,
{
pub(crate) fn new(tasks: I) -> Self {
Self(tasks)
}

pub(crate) fn run(self, test_threads: Option<usize>) {
let num_threads = test_threads
.or_else(|| thread::available_parallelism().ok().map(Into::into))
.unwrap_or(1);
let iter = sync::Mutex::new(self.0.into_iter());

thread::scope(|scope| {
let work = || {
let mut guard = iter.lock().unwrap();
for task in guard.by_ref() {
task();
}
};
for _ in 0..num_threads {
scope.spawn(work);
}
});
}
}

0 comments on commit bcb4721

Please sign in to comment.