Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Illumos taskq port #54

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions include/sys/taskq.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
* with the SPL. If not, see <http://www.gnu.org/licenses/>.
\*****************************************************************************/

/*
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
*/

#ifndef _SPL_TASKQ_H
#define _SPL_TASKQ_H

Expand All @@ -45,6 +49,17 @@
typedef unsigned long taskqid_t;
typedef void (task_func_t)(void *);

typedef struct taskq_ent {
spinlock_t t_lock;
struct list_head t_list;
taskqid_t t_id;
task_func_t *t_func;
void *t_arg;
uintptr_t tqent_flags;
} taskq_ent_t;

#define TQENT_FLAG_PREALLOC 0x1

/*
* Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as
* KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly
Expand Down Expand Up @@ -84,6 +99,8 @@ typedef struct taskq {
extern taskq_t *system_taskq;

extern taskqid_t __taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
/* Special form of taskq dispatch that uses preallocated entries. */
extern void __taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, taskq_ent_t *);
extern taskq_t *__taskq_create(const char *, int, pri_t, int, int, uint_t);
extern void __taskq_destroy(taskq_t *);
extern void __taskq_wait_id(taskq_t *, taskqid_t);
Expand All @@ -97,6 +114,7 @@ void spl_taskq_fini(void);
#define taskq_wait_id(tq, id) __taskq_wait_id(tq, id)
#define taskq_wait(tq) __taskq_wait(tq)
#define taskq_dispatch(tq, f, p, fl) __taskq_dispatch(tq, f, p, fl)
#define taskq_dispatch_ent(tq, f, p, fl, t) __taskq_dispatch_ent(tq, f, p, fl, t)
#define taskq_create(n, th, p, mi, ma, fl) __taskq_create(n, th, p, mi, ma, fl)
#define taskq_create_proc(n, th, p, mi, ma, pr, fl) \
__taskq_create(n, th, p, mi, ma, fl)
Expand Down
136 changes: 101 additions & 35 deletions module/spl/spl-taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
* Solaris Porting Layer (SPL) Task Queue Implementation.
\*****************************************************************************/

/*
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
*/

#include <sys/taskq.h>
#include <sys/kmem.h>
#include <spl-debug.h>
Expand All @@ -38,22 +42,14 @@
taskq_t *system_taskq;
EXPORT_SYMBOL(system_taskq);

typedef struct spl_task {
spinlock_t t_lock;
struct list_head t_list;
taskqid_t t_id;
task_func_t *t_func;
void *t_arg;
} spl_task_t;

/*
* NOTE: Must be called with tq->tq_lock held, returns a list_t which
* is not attached to the free, work, or pending taskq lists.
*/
static spl_task_t *
static taskq_ent_t *
task_alloc(taskq_t *tq, uint_t flags)
{
spl_task_t *t;
taskq_ent_t *t;
int count = 0;
SENTRY;

Expand All @@ -62,9 +58,12 @@ task_alloc(taskq_t *tq, uint_t flags)
ASSERT(!((flags & TQ_SLEEP) && (flags & TQ_NOSLEEP))); /* Not both */
ASSERT(spin_is_locked(&tq->tq_lock));
retry:
/* Acquire spl_task_t's from free list if available */
/* Acquire taskq_ent_t's from free list if available */
if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
t = list_entry(tq->tq_free_list.next, spl_task_t, t_list);
t = list_entry(tq->tq_free_list.next, taskq_ent_t, t_list);

ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));

list_del_init(&t->t_list);
SRETURN(t);
}
Expand All @@ -73,15 +72,15 @@ task_alloc(taskq_t *tq, uint_t flags)
if (flags & TQ_NOALLOC)
SRETURN(NULL);

/* Hit maximum spl_task_t pool size */
/* Hit maximum taskq_ent_t pool size */
if (tq->tq_nalloc >= tq->tq_maxalloc) {
if (flags & TQ_NOSLEEP)
SRETURN(NULL);

/*
* Sleep periodically polling the free list for an available
* spl_task_t. Dispatching with TQ_SLEEP should always succeed
* but we cannot block forever waiting for an spl_taskq_t to
* taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
* but we cannot block forever waiting for an taskq_entq_t to
* show up in the free list, otherwise a deadlock can happen.
*
* Therefore, we need to allocate a new task even if the number
Expand All @@ -97,7 +96,7 @@ task_alloc(taskq_t *tq, uint_t flags)
}

spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
t = kmem_alloc(sizeof(spl_task_t), flags & (TQ_SLEEP | TQ_NOSLEEP));
t = kmem_alloc(sizeof(taskq_ent_t), flags & (TQ_SLEEP | TQ_NOSLEEP));
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);

if (t) {
Expand All @@ -106,18 +105,20 @@ task_alloc(taskq_t *tq, uint_t flags)
t->t_id = 0;
t->t_func = NULL;
t->t_arg = NULL;
/* Make sure we start without any flags */
t->tqent_flags = 0;
tq->tq_nalloc++;
}

SRETURN(t);
}

/*
* NOTE: Must be called with tq->tq_lock held, expects the spl_task_t
* NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
* to already be removed from the free, work, or pending taskq lists.
*/
static void
task_free(taskq_t *tq, spl_task_t *t)
task_free(taskq_t *tq, taskq_ent_t *t)
{
SENTRY;

Expand All @@ -126,18 +127,18 @@ task_free(taskq_t *tq, spl_task_t *t)
ASSERT(spin_is_locked(&tq->tq_lock));
ASSERT(list_empty(&t->t_list));

kmem_free(t, sizeof(spl_task_t));
kmem_free(t, sizeof(taskq_ent_t));
tq->tq_nalloc--;

SEXIT;
}

/*
* NOTE: Must be called with tq->tq_lock held, either destroys the
* spl_task_t if too many exist or moves it to the free list for later use.
* taskq_ent_t if too many exist or moves it to the free list for later use.
*/
static void
task_done(taskq_t *tq, spl_task_t *t)
task_done(taskq_t *tq, taskq_ent_t *t)
{
SENTRY;
ASSERT(tq);
Expand Down Expand Up @@ -245,7 +246,7 @@ EXPORT_SYMBOL(__taskq_member);
taskqid_t
__taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
{
spl_task_t *t;
taskq_ent_t *t;
taskqid_t rc = 0;
SENTRY;

Expand Down Expand Up @@ -276,6 +277,9 @@ __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)

spin_lock(&t->t_lock);

/* Make sure we start without any flags */
t->tqent_flags = 0;

/* Queue to the priority list instead of the pending list */
if (flags & TQ_FRONT)
list_add_tail(&t->t_list, &tq->tq_prio_list);
Expand All @@ -295,6 +299,43 @@ __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
}
EXPORT_SYMBOL(__taskq_dispatch);

void
taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
taskq_ent_t *t)
{
SENTRY;

ASSERT(func != NULL);
ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));

/*
* Mark it as a prealloc'd task. This is important
* to ensure that we don't free it later.
*/
t->tqent_flags |= TQENT_FLAG_PREALLOC;
/*
* Enqueue the task to the underlying queue.
*/
spin_lock(&t->t_lock);

/* Queue to the priority list instead of the pending list */
if (flags & TQ_FRONT)
list_add_tail(&t->t_list, &tq->tq_prio_list);
else
list_add_tail(&t->t_list, &tq->tq_pend_list);

t->t_id = tq->tq_next_id;
tq->tq_next_id++;
t->t_func = func;
t->t_arg = arg;
spin_unlock(&t->t_lock);

wake_up(&tq->tq_work_waitq);

SEXIT;
}
EXPORT_SYMBOL(__taskq_dispatch_ent);

/*
* Returns the lowest incomplete taskqid_t. The taskqid_t may
* be queued on the pending list, on the priority list, or on
Expand All @@ -305,24 +346,24 @@ static taskqid_t
taskq_lowest_id(taskq_t *tq)
{
taskqid_t lowest_id = tq->tq_next_id;
spl_task_t *t;
taskq_ent_t *t;
SENTRY;

ASSERT(tq);
ASSERT(spin_is_locked(&tq->tq_lock));

if (!list_empty(&tq->tq_pend_list)) {
t = list_entry(tq->tq_pend_list.next, spl_task_t, t_list);
t = list_entry(tq->tq_pend_list.next, taskq_ent_t, t_list);
lowest_id = MIN(lowest_id, t->t_id);
}

if (!list_empty(&tq->tq_prio_list)) {
t = list_entry(tq->tq_prio_list.next, spl_task_t, t_list);
t = list_entry(tq->tq_prio_list.next, taskq_ent_t, t_list);
lowest_id = MIN(lowest_id, t->t_id);
}

if (!list_empty(&tq->tq_work_list)) {
t = list_entry(tq->tq_work_list.next, spl_task_t, t_list);
t = list_entry(tq->tq_work_list.next, taskq_ent_t, t_list);
lowest_id = MIN(lowest_id, t->t_id);
}

Expand All @@ -334,9 +375,9 @@ taskq_lowest_id(taskq_t *tq)
* taskqid.
*/
static void
taskq_insert_in_order(taskq_t *tq, spl_task_t *t)
taskq_insert_in_order(taskq_t *tq, taskq_ent_t *t)
{
spl_task_t *w;
taskq_ent_t *w;
struct list_head *l;

SENTRY;
Expand All @@ -345,7 +386,7 @@ taskq_insert_in_order(taskq_t *tq, spl_task_t *t)
ASSERT(spin_is_locked(&tq->tq_lock));

list_for_each_prev(l, &tq->tq_work_list) {
w = list_entry(l, spl_task_t, t_list);
w = list_entry(l, taskq_ent_t, t_list);
if (w->t_id < t->t_id) {
list_add(&t->t_list, l);
break;
Expand All @@ -364,8 +405,9 @@ taskq_thread(void *args)
sigset_t blocked;
taskqid_t id;
taskq_t *tq = args;
spl_task_t *t;
taskq_ent_t *t;
struct list_head *pend_list;
boolean_t freeit;
SENTRY;

ASSERT(tq);
Expand Down Expand Up @@ -406,10 +448,30 @@ taskq_thread(void *args)
pend_list = NULL;

if (pend_list) {
t = list_entry(pend_list->next, spl_task_t, t_list);
t = list_entry(pend_list->next, taskq_ent_t, t_list);
list_del_init(&t->t_list);
taskq_insert_in_order(tq, t);
tq->tq_nactive++;

/*
* For prealloc'd tasks, we don't free anything.
* We have to check this now, because once we
* call the function for a prealloc'd taskq, we
* can't touch the taskq_ent any longer (calling
* the function returns the owndership of the
* tqent back to the caller of taskq_dispatch.)
*/
if ((!(tq->tq_flags & TASKQ_DYNAMIC)) &&
(t->tqent_flags & TQENT_FLAG_PREALLOC)) {
/* clear pointers to assist assertion
* checks
*/
list_del_init(&t->t_list);
freeit = B_FALSE;
} else {
freeit = B_TRUE;
}

spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);

/* Perform the requested task */
Expand All @@ -418,7 +480,8 @@ taskq_thread(void *args)
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
tq->tq_nactive--;
id = t->t_id;
task_done(tq, t);
if (freeit)
task_done(tq, t);

/* When the current lowest outstanding taskqid is
* done calculate the new lowest outstanding id */
Expand Down Expand Up @@ -529,7 +592,7 @@ EXPORT_SYMBOL(__taskq_create);
void
__taskq_destroy(taskq_t *tq)
{
spl_task_t *t;
taskq_ent_t *t;
int i, nthreads;
SENTRY;

Expand All @@ -549,7 +612,10 @@ __taskq_destroy(taskq_t *tq)
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);

while (!list_empty(&tq->tq_free_list)) {
t = list_entry(tq->tq_free_list.next, spl_task_t, t_list);
t = list_entry(tq->tq_free_list.next, taskq_ent_t, t_list);

ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));

list_del_init(&t->t_list);
task_free(tq, t);
}
Expand All @@ -562,7 +628,7 @@ __taskq_destroy(taskq_t *tq)
ASSERT(list_empty(&tq->tq_prio_list));

spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
kmem_free(tq->tq_threads, nthreads * sizeof(spl_task_t *));
kmem_free(tq->tq_threads, nthreads * sizeof(taskq_ent_t *));
kmem_free(tq, sizeof(taskq_t));

SEXIT;
Expand Down