Skip to content

Commit

Permalink
threadpool: update crossbeam dependencies (#874)
Browse files Browse the repository at this point in the history
  • Loading branch information
Stjepan Glavina authored and carllerche committed Jan 30, 2019
1 parent 11e2af6 commit e1a07ce
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 85 deletions.
7 changes: 3 additions & 4 deletions tokio-threadpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ categories = ["concurrency", "asynchronous"]
[dependencies]
tokio-executor = { version = "0.1.2", path = "../tokio-executor" }
futures = "0.1.19"
crossbeam = "0.6.0"
crossbeam-channel = "0.3.3"
crossbeam-deque = "0.6.1"
crossbeam-utils = "0.6.2"
crossbeam-deque = "0.7.0"
crossbeam-queue = "0.1.0"
crossbeam-utils = "0.6.4"
num_cpus = "1.2"
rand = "0.6"
slab = "0.4.1"
Expand Down
4 changes: 2 additions & 2 deletions tokio-threadpool/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use config::{Config, MAX_WORKERS};
use park::{BoxPark, BoxedPark, DefaultPark};
use shutdown::ShutdownTrigger;
use pool::{Pool, MAX_BACKUP};
use task::Queue;
use thread_pool::ThreadPool;
use worker::{self, Worker, WorkerId};

Expand All @@ -13,6 +12,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::cmp::max;

use crossbeam_deque::Injector;
use num_cpus;
use tokio_executor::Enter;
use tokio_executor::park::Park;
Expand Down Expand Up @@ -414,7 +414,7 @@ impl Builder {
workers.into()
};

let queue = Arc::new(Queue::new());
let queue = Arc::new(Injector::new());

// Create a trigger that will clean up resources on shutdown.
//
Expand Down
5 changes: 2 additions & 3 deletions tokio-threadpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@

extern crate tokio_executor;

extern crate crossbeam;
extern crate crossbeam_channel;
extern crate crossbeam_deque as deque;
extern crate crossbeam_deque;
extern crate crossbeam_queue;
extern crate crossbeam_utils;
#[macro_use]
extern crate futures;
Expand Down
7 changes: 4 additions & 3 deletions tokio-threadpool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use self::backup_stack::BackupStack;

use config::Config;
use shutdown::ShutdownTrigger;
use task::{Blocking, Queue, Task};
use task::{Blocking, Task};
use worker::{self, Worker, WorkerId};

use futures::Poll;
Expand All @@ -27,6 +27,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Weak};
use std::thread;

use crossbeam_deque::Injector;
use crossbeam_utils::CachePadded;
use rand;

Expand Down Expand Up @@ -57,7 +58,7 @@ pub(crate) struct Pool {
//
// Spawned tasks are pushed into this queue. Although worker threads have their own dedicated
// task queues, they periodically steal tasks from this global queue, too.
pub queue: Arc<Queue>,
pub queue: Arc<Injector<Arc<Task>>>,

// Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
//
Expand Down Expand Up @@ -90,7 +91,7 @@ impl Pool {
trigger: Weak<ShutdownTrigger>,
max_blocking: usize,
config: Config,
queue: Arc<Queue>,
queue: Arc<Injector<Arc<Task>>>,
) -> Pool {
let pool_size = workers.len();
let total_size = max_blocking + pool_size;
Expand Down
12 changes: 8 additions & 4 deletions tokio-threadpool/src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use task::Queue;
use task::Task;
use worker;

use crossbeam_deque::Injector;
use futures::{Future, Poll, Async};
use futures::task::AtomicTask;

Expand Down Expand Up @@ -62,14 +63,17 @@ impl Future for Shutdown {
pub(crate) struct ShutdownTrigger {
inner: Arc<Mutex<Inner>>,
workers: Arc<[worker::Entry]>,
queue: Arc<Queue>,
queue: Arc<Injector<Arc<Task>>>,
}

unsafe impl Send for ShutdownTrigger {}
unsafe impl Sync for ShutdownTrigger {}

impl ShutdownTrigger {
pub(crate) fn new(workers: Arc<[worker::Entry]>, queue: Arc<Queue>) -> ShutdownTrigger {
pub(crate) fn new(
workers: Arc<[worker::Entry]>,
queue: Arc<Injector<Arc<Task>>>,
) -> ShutdownTrigger {
ShutdownTrigger {
inner: Arc::new(Mutex::new(Inner {
task: AtomicTask::new(),
Expand All @@ -84,7 +88,7 @@ impl ShutdownTrigger {
impl Drop for ShutdownTrigger {
fn drop(&mut self) {
// Drain the global task queue.
while self.queue.pop().is_some() {}
while !self.queue.steal().is_empty() {}

// Drop the remaining incomplete tasks and parkers assosicated with workers.
for worker in self.workers.iter() {
Expand Down
2 changes: 0 additions & 2 deletions tokio-threadpool/src/task/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
mod blocking;
mod blocking_state;
mod queue;
mod state;

pub(crate) use self::blocking::{Blocking, CanBlock};
pub(crate) use self::queue::Queue;
use self::blocking_state::BlockingState;
use self::state::State;

Expand Down
34 changes: 0 additions & 34 deletions tokio-threadpool/src/task/queue.rs

This file was deleted.

30 changes: 11 additions & 19 deletions tokio-threadpool/src/worker/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed, Release};
use std::time::Duration;

use crossbeam::queue::SegQueue;
use crossbeam_deque::{Steal, Stealer, Worker};
use crossbeam_queue::SegQueue;
use crossbeam_utils::CachePadded;
use deque;
use slab::Slab;

// TODO: None of the fields should be public
Expand All @@ -29,10 +29,10 @@ pub(crate) struct WorkerEntry {
next_sleeper: UnsafeCell<usize>,

// Worker half of deque
worker: deque::Worker<Arc<Task>>,
pub worker: Worker<Arc<Task>>,

// Stealer half of deque
stealer: deque::Stealer<Arc<Task>>,
stealer: Stealer<Arc<Task>>,

// Thread parker
park: UnsafeCell<Option<BoxPark>>,
Expand All @@ -53,7 +53,8 @@ pub(crate) struct WorkerEntry {

impl WorkerEntry {
pub fn new(park: BoxPark, unpark: BoxUnpark) -> Self {
let (w, s) = deque::fifo();
let w = Worker::new_fifo();
let s = w.stealer();

WorkerEntry {
state: CachePadded::new(AtomicUsize::new(State::default().into())),
Expand Down Expand Up @@ -187,7 +188,7 @@ impl WorkerEntry {
/// This **must** only be called by the thread that owns the worker entry.
/// This function is not `Sync`.
#[inline]
pub fn pop_task(&self) -> deque::Pop<Arc<Task>> {
pub fn pop_task(&self) -> Option<Arc<Task>> {
self.worker.pop()
}

Expand All @@ -199,23 +200,15 @@ impl WorkerEntry {
/// At the same time, this method steals some additional tasks and moves
/// them into `dest` in order to balance the work distribution among
/// workers.
pub fn steal_tasks(&self, dest: &Self) -> deque::Steal<Arc<Task>> {
self.stealer.steal_many(&dest.worker)
pub fn steal_tasks(&self, dest: &Self) -> Steal<Arc<Task>> {
self.stealer.steal_batch_and_pop(&dest.worker)
}

/// Drain (and drop) all tasks that are queued for work.
///
/// This is called when the pool is shutting down.
pub fn drain_tasks(&self) {
use deque::Pop::*;

loop {
match self.worker.pop() {
Data(_) => {}
Empty => break,
Retry => {}
}
}
while self.worker.pop().is_some() {}
}

/// Parks the worker thread.
Expand Down Expand Up @@ -284,7 +277,6 @@ impl WorkerEntry {
}
running_tasks.clear();

// Drop the parker.
unsafe {
*self.park.get() = None;
*self.unpark.get() = None;
Expand All @@ -297,7 +289,7 @@ impl WorkerEntry {
if self.needs_drain.compare_and_swap(true, false, Acquire) {
let running_tasks = unsafe { &mut *self.running_tasks.get() };

while let Some(task) = self.remotely_completed_tasks.try_pop() {
while let Ok(task) = self.remotely_completed_tasks.pop() {
running_tasks.remove(task.reg_index.get());
}
}
Expand Down
27 changes: 13 additions & 14 deletions tokio-threadpool/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,24 +386,21 @@ impl Worker {
///
/// Returns `true` if work was found.
fn try_run_owned_task(&self, notify: &Arc<Notifier>) -> bool {
use deque::Pop;

// Poll the internal queue for a task to run
match self.entry().pop_task() {
Pop::Data(task) => {
Some(task) => {
self.run_task(task, notify);
true
}
Pop::Empty => false,
Pop::Retry => true,
None => false,
}
}

/// Tries to steal a task from another worker.
///
/// Returns `true` if work was found
fn try_steal_task(&self, notify: &Arc<Notifier>) -> bool {
use deque::Steal;
use crossbeam_deque::Steal;

debug_assert!(!self.is_blocking.get());

Expand All @@ -415,7 +412,7 @@ impl Worker {
loop {
if idx < len {
match self.pool.workers[idx].steal_tasks(self.entry()) {
Steal::Data(task) => {
Steal::Success(task) => {
trace!("stole task from another worker");

self.run_task(task, notify);
Expand Down Expand Up @@ -701,15 +698,17 @@ impl Worker {
///
/// Returns `true` if this worker has tasks in its queue.
fn sleep_light(&self) {
const STEAL_COUNT: usize = 32;

self.entry().park_timeout(Duration::from_millis(0));

for _ in 0..STEAL_COUNT {
if let Some(task) = self.pool.queue.pop() {
self.pool.submit(task, &self.pool);
} else {
break;
use crossbeam_deque::Steal;
loop {
match self.pool.queue.steal_batch(&self.entry().worker) {
Steal::Success(()) => {
self.pool.signal_work(&self.pool);
break;
}
Steal::Empty => break,
Steal::Retry => {}
}
}
}
Expand Down

0 comments on commit e1a07ce

Please sign in to comment.