From 55dccdb28d1d3f52ab714741010be965a00c5f29 Mon Sep 17 00:00:00 2001 From: Alexander Bulaev Date: Wed, 29 Nov 2017 17:53:23 +0300 Subject: [PATCH] Update dependencies, bump version --- Cargo.toml | 13 ++++++------- README.md | 4 +++- src/lib.rs | 32 +++++++++++++++++++------------- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0b50182..bcbb3e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "scoped-pool" -version = "1.0.0" +version = "1.1.0" authors = ["Jonathan Reem "] repository = "https://github.com/reem/rust-scoped-pool.git" description = "A flexible thread pool providing scoped threads." @@ -9,11 +9,10 @@ readme = "README.md" license = "MIT" [dependencies] -variance = "0.1" -crossbeam = "0.2" -scopeguard = "0.1" +variance = "0.1.3" +crossbeam-channel = "0.3.8" +scopeguard = "1.0.0" [dev-dependencies] -rand = "0.3" -itertools = "0.4" - +rand = "0.6.5" +itertools = "0.8.0" diff --git a/README.md b/README.md index 278d3f2..fb34fb9 100644 --- a/README.md +++ b/README.md @@ -65,9 +65,11 @@ with the rest of your dependencies: ```toml [dependencies] -scoped-pool = "1" +scoped-pool = "1.1" ``` +`scoped-pool` currently requires Rust 1.11. + ## Author [Jonathan Reem](https://medium.com/@jreem) is the primary author and maintainer of scoped-pool. diff --git a/src/lib.rs b/src/lib.rs index 778c96b..8eeac60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,13 +7,13 @@ //! extern crate variance; -extern crate crossbeam; +extern crate crossbeam_channel; #[macro_use] extern crate scopeguard; use variance::InvariantLifetime as Id; -use crossbeam::sync::MsQueue; +use crossbeam_channel::{Sender, Receiver}; use std::{thread, mem}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -128,7 +128,7 @@ impl Pool { #[inline] pub fn shutdown(&self) { // Start the shutdown process. - self.inner.queue.push(PoolMessage::Quit); + let _ = self.inner.tx.send(PoolMessage::Quit); // Wait for it to complete. self.wait.join() @@ -165,18 +165,18 @@ impl Pool { let mut thread_sentinel = ThreadSentinel(Some(self.clone())); loop { - match self.inner.queue.pop() { + match self.inner.rx.recv().unwrap() { // On Quit, repropogate and quit. PoolMessage::Quit => { // Repropogate the Quit message to other threads. - self.inner.queue.push(PoolMessage::Quit); + let _ = self.inner.tx.send(PoolMessage::Quit); // Cancel the thread sentinel so we don't panic waiting // shutdown threads, and don't restart the thread. thread_sentinel.cancel(); // Terminate the thread. - break + break; }, // On Task, run the task then complete the WaitGroup. @@ -191,21 +191,27 @@ impl Pool { } struct PoolInner { - queue: MsQueue, + tx: Sender, + rx: Receiver, thread_config: ThreadConfig, - thread_counter: AtomicUsize + thread_counter: AtomicUsize, } impl PoolInner { fn with_thread_config(thread_config: ThreadConfig) -> Self { - PoolInner { thread_config: thread_config, ..Self::default() } + PoolInner { + thread_config, + ..Default::default() + } } } impl Default for PoolInner { fn default() -> Self { + let (tx, rx) = crossbeam_channel::unbounded(); PoolInner { - queue: MsQueue::new(), + tx, + rx, thread_config: ThreadConfig::default(), thread_counter: AtomicUsize::new(1) } @@ -279,7 +285,7 @@ impl<'scope> Scope<'scope> { #[inline] pub fn forever(pool: Pool) -> Scope<'static> { Scope { - pool: pool, + pool, wait: Arc::new(WaitGroup::new()), _scope: Id::default() } @@ -301,7 +307,7 @@ impl<'scope> Scope<'scope> { }; // Submit the task to be executed. - self.pool.inner.queue.push(PoolMessage::Task(task, self.wait.clone())); + let _ = self.pool.inner.tx.send(PoolMessage::Task(task, self.wait.clone())); } /// Add a job to this scope which itself will get access to the scope. @@ -322,7 +328,7 @@ impl<'scope> Scope<'scope> { pub fn zoom<'smaller, F, R>(&self, scheduler: F) -> R where F: FnOnce(&Scope<'smaller>) -> R, 'scope: 'smaller { - let scope = unsafe { self.refine::<'smaller>() }; + let scope = unsafe { self.refine() }; // Join the scope either on completion of the scheduler or panic. defer!(scope.join());