Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
Add TASKQ_DYNAMIC feature
Browse files Browse the repository at this point in the history
Setting the TASKQ_DYNAMIC flag will create a taskq with dynamic
semantics.  Initially only a single worker thread will be created
to service tasks dispatched to the queue.  As additional threads
are needed they will be dynamically spawned up to the max number
specified by 'nthreads'.  When the threads are no longer needed,
because the taskq is empty, they will automatically terminate.

Due to the low cost of creating and destroying threads under Linux
by default new threads and spawned and terminated aggressively.
There are two modules options which can be tuned to adjust this
behavior if needed.

* spl_taskq_thread_sequential - The number of sequential tasks,
without interruption, which needed to be handled by a worker
thread before a new worker thread is spawned.  Default 8.

* spl_taskq_thread_dynamic - Provides the ability to completely
disable the use of dynamic taskqs on the system.  This is provided
for the purposes of debugging and troubleshooting.  Default 1
(enabled).

This behavior is consistent with the dynamic taskq implemenation
found in both illumos and FreeBSD.

Signed-off-by: Brian Behlendorf <[email protected]>
  • Loading branch information
behlendorf committed Jun 4, 2015
1 parent 9cef1b5 commit b355c17
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 101 deletions.
5 changes: 3 additions & 2 deletions include/sys/taskq.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ typedef void (task_func_t)(void *);
typedef struct taskq {
spinlock_t tq_lock; /* protects taskq_t */
unsigned long tq_lock_flags; /* interrupt state */
const char *tq_name; /* taskq name */
char *tq_name; /* taskq name */
struct list_head tq_thread_list;/* list of all threads */
struct list_head tq_active_list;/* list of active threads */
int tq_nactive; /* # of active threads */
int tq_nthreads; /* # of total threads */
int tq_nthreads; /* # of existing threads */
int tq_maxthreads; /* max # of threads */
int tq_pri; /* priority */
int tq_minalloc; /* min task_t pool size */
int tq_maxalloc; /* max task_t pool size */
Expand Down
149 changes: 107 additions & 42 deletions module/spl/spl-taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,22 @@ int spl_taskq_thread_bind = 0;
module_param(spl_taskq_thread_bind, int, 0644);
MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");


int spl_taskq_thread_dynamic = 1;
module_param(spl_taskq_thread_dynamic, int, 0644);
MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");

int spl_taskq_thread_sequential = 8;
module_param(spl_taskq_thread_sequential, int, 0644);
MODULE_PARM_DESC(spl_taskq_thread_sequential,
"Create new taskq threads after N sequential tasks");

/* Global system-wide dynamic task queue available for all consumers */
taskq_t *system_taskq;
EXPORT_SYMBOL(system_taskq);

static taskq_thread_t *taskq_thread_create(taskq_t *);

static int
task_km_flags(uint_t flags)
{
Expand Down Expand Up @@ -434,17 +446,22 @@ taskq_member(taskq_t *tq, void *t)
{
struct list_head *l;
taskq_thread_t *tqt;
int found = 0;

ASSERT(tq);
ASSERT(t);

spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
list_for_each(l, &tq->tq_thread_list) {
tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
if (tqt->tqt_thread == (struct task_struct *)t)
return (1);
if (tqt->tqt_thread == (struct task_struct *)t) {
found = 1;
break;
}
}
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);

return (0);
return (found);
}
EXPORT_SYMBOL(taskq_member);

Expand Down Expand Up @@ -604,7 +621,6 @@ taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
{
ASSERT(tq);
ASSERT(func);
ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));

spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);

Expand Down Expand Up @@ -673,6 +689,7 @@ taskq_thread(void *args)
taskq_t *tq;
taskq_ent_t *t;
struct list_head *pend_list;
int seq_tasks = 0;

ASSERT(tqt);
tq = tqt->tqt_tq;
Expand All @@ -683,24 +700,41 @@ taskq_thread(void *args)
flush_signals(current);

spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);

/* Immediately exit if more threads than allowed were created.*/
if (tq->tq_nthreads >= tq->tq_maxthreads)
goto error;

tq->tq_nthreads++;
list_add(&tqt->tqt_thread_list, &tq->tq_thread_list);
wake_up(&tq->tq_wait_waitq);
set_current_state(TASK_INTERRUPTIBLE);

while (!kthread_should_stop()) {

if (list_empty(&tq->tq_pend_list) &&
list_empty(&tq->tq_prio_list)) {

/*
* Dynamic taskqs will destroy threads which would
* otherwise be idle, they are recreated as needed.
*/
if ((tq->tq_flags & TASKQ_DYNAMIC) &&
(tq->tq_nthreads > 1) && spl_taskq_thread_dynamic)
break;

add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);

seq_tasks = 0;
schedule();

spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
remove_wait_queue(&tq->tq_work_waitq, &wait);
} else {
__set_current_state(TASK_RUNNING);
}


if (!list_empty(&tq->tq_prio_list))
pend_list = &tq->tq_prio_list;
else if (!list_empty(&tq->tq_pend_list))
Expand Down Expand Up @@ -732,14 +766,27 @@ taskq_thread(void *args)
/* Perform the requested task */
t->tqent_func(t->tqent_arg);

/*
* Dynamic taskq's will create additional threads up
* to tq_maxthreads when a single thread has handled
* more than spl_taskq_thread_sequential tasks without
* sleeping due to an empty queue.
*/
if ((++seq_tasks > spl_taskq_thread_sequential) &&
(tq->tq_flags & TASKQ_DYNAMIC) &&
(tq->tq_flags & TQ_ACTIVE) &&
(tq->tq_nthreads < tq->tq_maxthreads)) {
taskq_thread_create(tq);
seq_tasks = 0;
}

spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
tq->tq_nactive--;
list_del_init(&tqt->tqt_active_list);
tqt->tqt_task = NULL;

/* For prealloc'd tasks, we don't free anything. */
if ((tq->tq_flags & TASKQ_DYNAMIC) ||
!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
task_done(tq, t);

/* When the current lowest outstanding taskqid is
Expand All @@ -761,27 +808,56 @@ taskq_thread(void *args)
__set_current_state(TASK_RUNNING);
tq->tq_nthreads--;
list_del_init(&tqt->tqt_thread_list);
error:
kmem_free(tqt, sizeof(taskq_thread_t));

spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);

return (0);
}

static taskq_thread_t *
taskq_thread_create(taskq_t *tq)
{
static int last_used_cpu = 0;
taskq_thread_t *tqt;

tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE);
INIT_LIST_HEAD(&tqt->tqt_thread_list);
INIT_LIST_HEAD(&tqt->tqt_active_list);
tqt->tqt_tq = tq;
tqt->tqt_id = 0;

tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
"%s", tq->tq_name);
if (tqt->tqt_thread == NULL) {
kmem_free(tqt, sizeof (taskq_thread_t));
return (NULL);
}

if (spl_taskq_thread_bind) {
last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
kthread_bind(tqt->tqt_thread, last_used_cpu);
}

set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));
wake_up_process(tqt->tqt_thread);

return (tqt);
}

taskq_t *
taskq_create(const char *name, int nthreads, pri_t pri,
int minalloc, int maxalloc, uint_t flags)
{
static int last_used_cpu = 0;
taskq_t *tq;
taskq_thread_t *tqt;
int rc = 0, i, j = 0;
int count = 0, rc = 0, i;

ASSERT(name != NULL);
ASSERT(pri <= maxclsyspri);
ASSERT(minalloc >= 0);
ASSERT(maxalloc <= INT_MAX);
ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */
ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */

/* Scale the number of threads using nthreads as a percentage */
if (flags & TASKQ_THREADS_CPU_PCT) {
Expand All @@ -800,16 +876,17 @@ taskq_create(const char *name, int nthreads, pri_t pri,
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
INIT_LIST_HEAD(&tq->tq_thread_list);
INIT_LIST_HEAD(&tq->tq_active_list);
tq->tq_name = name;
tq->tq_nactive = 0;
tq->tq_nthreads = 0;
tq->tq_pri = pri;
tq->tq_minalloc = minalloc;
tq->tq_maxalloc = maxalloc;
tq->tq_nalloc = 0;
tq->tq_flags = (flags | TQ_ACTIVE);
tq->tq_next_id = 1;
tq->tq_lowest_id = 1;
tq->tq_name = strdup(name);
tq->tq_nactive = 0;
tq->tq_nthreads = 0;
tq->tq_maxthreads = nthreads;
tq->tq_pri = pri;
tq->tq_minalloc = minalloc;
tq->tq_maxalloc = maxalloc;
tq->tq_nalloc = 0;
tq->tq_flags = (flags | TQ_ACTIVE);
tq->tq_next_id = 1;
tq->tq_lowest_id = 1;
INIT_LIST_HEAD(&tq->tq_free_list);
INIT_LIST_HEAD(&tq->tq_pend_list);
INIT_LIST_HEAD(&tq->tq_prio_list);
Expand All @@ -823,32 +900,19 @@ taskq_create(const char *name, int nthreads, pri_t pri,

spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);

if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
nthreads = 1;

for (i = 0; i < nthreads; i++) {
tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE);
INIT_LIST_HEAD(&tqt->tqt_thread_list);
INIT_LIST_HEAD(&tqt->tqt_active_list);
tqt->tqt_tq = tq;
tqt->tqt_id = 0;

tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
"%s/%d", name, i);
if (tqt->tqt_thread) {
list_add(&tqt->tqt_thread_list, &tq->tq_thread_list);
if (spl_taskq_thread_bind) {
last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
kthread_bind(tqt->tqt_thread, last_used_cpu);
}
set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri));
wake_up_process(tqt->tqt_thread);
j++;
} else {
kmem_free(tqt, sizeof(taskq_thread_t));
tqt = taskq_thread_create(tq);
if (tqt == NULL)
rc = 1;
}
else
count++;
}

/* Wait for all threads to be started before potential destroy */
wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);

if (rc) {
taskq_destroy(tq);
Expand Down Expand Up @@ -913,6 +977,7 @@ taskq_destroy(taskq_t *tq)

spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);

strfree(tq->tq_name);
kmem_free(tq, sizeof(taskq_t));
}
EXPORT_SYMBOL(taskq_destroy);
Expand Down
Loading

0 comments on commit b355c17

Please sign in to comment.