Skip to content

Commit

Permalink
734 taskq_dispatch_prealloc() desired
Browse files Browse the repository at this point in the history
943 zio_interrupt ends up calling taskq_dispatch with TQ_SLEEP
Reviewed by: Albert Lee <[email protected]>
Reviewed by: Richard Lowe <[email protected]>
Reviewed by: Alexey Zaytsev <[email protected]>
Reviewed by: Jason Brian King <[email protected]>
Reviewed by: George Wilson <[email protected]>
Reviewed by: Adam Leventhal <[email protected]>
Approved by: Gordon Ross <[email protected]>
  • Loading branch information
Garrett D'Amore committed Jul 27, 2011
1 parent 135e56f commit 5aeb947
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 46 deletions.
16 changes: 16 additions & 0 deletions usr/src/lib/libzpool/common/sys/zfs_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
/*
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
*/
/*
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
*/

#ifndef _SYS_ZFS_CONTEXT_H
#define _SYS_ZFS_CONTEXT_H
Expand Down Expand Up @@ -347,6 +350,16 @@ typedef struct taskq taskq_t;
typedef uintptr_t taskqid_t;
typedef void (task_func_t)(void *);

typedef struct taskq_ent {
struct taskq_ent *tqent_next;
struct taskq_ent *tqent_prev;
task_func_t *tqent_func;
void *tqent_arg;
uintptr_t tqent_flags;
} taskq_ent_t;

#define TQENT_FLAG_PREALLOC 0x1 /* taskq_dispatch_ent used */

#define TASKQ_PREPOPULATE 0x0001
#define TASKQ_CPR_SAFE 0x0002 /* Use CPR safe protocol */
#define TASKQ_DYNAMIC 0x0004 /* Use dynamic thread scheduling */
Expand All @@ -358,6 +371,7 @@ typedef void (task_func_t)(void *);
#define TQ_NOQUEUE 0x02 /* Do not enqueue if can't dispatch */
#define TQ_FRONT 0x08 /* Queue in front */


extern taskq_t *system_taskq;

extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t);
Expand All @@ -366,6 +380,8 @@ extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t);
#define taskq_create_sysdc(a, b, d, e, p, dc, f) \
(taskq_create(a, b, maxclsyspri, d, e, f))
extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t,
taskq_ent_t *);
extern void taskq_destroy(taskq_t *);
extern void taskq_wait(taskq_t *);
extern int taskq_member(taskq_t *, void *);
Expand Down
101 changes: 67 additions & 34 deletions usr/src/lib/libzpool/common/taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@
* Copyright 2010 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
/*
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
*/

#include <sys/zfs_context.h>

int taskq_now;
taskq_t *system_taskq;

typedef struct task {
struct task *task_next;
struct task *task_prev;
task_func_t *task_func;
void *task_arg;
} task_t;

#define TASKQ_ACTIVE 0x00010000

struct taskq {
Expand All @@ -51,18 +47,18 @@ struct taskq {
int tq_maxalloc;
kcondvar_t tq_maxalloc_cv;
int tq_maxalloc_wait;
task_t *tq_freelist;
task_t tq_task;
taskq_ent_t *tq_freelist;
taskq_ent_t tq_task;
};

static task_t *
static taskq_ent_t *
task_alloc(taskq_t *tq, int tqflags)
{
task_t *t;
taskq_ent_t *t;
int rv;

again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
tq->tq_freelist = t->task_next;
tq->tq_freelist = t->tqent_next;
} else {
if (tq->tq_nalloc >= tq->tq_maxalloc) {
if (!(tqflags & KM_SLEEP))
Expand All @@ -87,7 +83,7 @@ again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
}
mutex_exit(&tq->tq_lock);

t = kmem_alloc(sizeof (task_t), tqflags);
t = kmem_alloc(sizeof (taskq_ent_t), tqflags);

mutex_enter(&tq->tq_lock);
if (t != NULL)
Expand All @@ -97,15 +93,15 @@ again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
}

static void
task_free(taskq_t *tq, task_t *t)
task_free(taskq_t *tq, taskq_ent_t *t)
{
if (tq->tq_nalloc <= tq->tq_minalloc) {
t->task_next = tq->tq_freelist;
t->tqent_next = tq->tq_freelist;
tq->tq_freelist = t;
} else {
tq->tq_nalloc--;
mutex_exit(&tq->tq_lock);
kmem_free(t, sizeof (task_t));
kmem_free(t, sizeof (taskq_ent_t));
mutex_enter(&tq->tq_lock);
}

Expand All @@ -116,7 +112,7 @@ task_free(taskq_t *tq, task_t *t)
taskqid_t
taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
{
task_t *t;
taskq_ent_t *t;

if (taskq_now) {
func(arg);
Expand All @@ -130,26 +126,58 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
return (0);
}
if (tqflags & TQ_FRONT) {
t->task_next = tq->tq_task.task_next;
t->task_prev = &tq->tq_task;
t->tqent_next = tq->tq_task.tqent_next;
t->tqent_prev = &tq->tq_task;
} else {
t->task_next = &tq->tq_task;
t->task_prev = tq->tq_task.task_prev;
t->tqent_next = &tq->tq_task;
t->tqent_prev = tq->tq_task.tqent_prev;
}
t->task_next->task_prev = t;
t->task_prev->task_next = t;
t->task_func = func;
t->task_arg = arg;
t->tqent_next->tqent_prev = t;
t->tqent_prev->tqent_next = t;
t->tqent_func = func;
t->tqent_arg = arg;
cv_signal(&tq->tq_dispatch_cv);
mutex_exit(&tq->tq_lock);
return (1);
}

void
taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
taskq_ent_t *t)
{
ASSERT(func != NULL);
ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));

/*
* Mark it as a prealloc'd task. This is important
* to ensure that we don't free it later.
*/
t->tqent_flags |= TQENT_FLAG_PREALLOC;
/*
* Enqueue the task to the underlying queue.
*/
mutex_enter(&tq->tq_lock);

if (flags & TQ_FRONT) {
t->tqent_next = tq->tq_task.tqent_next;
t->tqent_prev = &tq->tq_task;
} else {
t->tqent_next = &tq->tq_task;
t->tqent_prev = tq->tq_task.tqent_prev;
}
t->tqent_next->tqent_prev = t;
t->tqent_prev->tqent_next = t;
t->tqent_func = func;
t->tqent_arg = arg;
cv_signal(&tq->tq_dispatch_cv);
mutex_exit(&tq->tq_lock);
}

void
taskq_wait(taskq_t *tq)
{
mutex_enter(&tq->tq_lock);
while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0)
while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
mutex_exit(&tq->tq_lock);
}
Expand All @@ -158,27 +186,32 @@ static void *
taskq_thread(void *arg)
{
taskq_t *tq = arg;
task_t *t;
taskq_ent_t *t;
boolean_t prealloc;

mutex_enter(&tq->tq_lock);
while (tq->tq_flags & TASKQ_ACTIVE) {
if ((t = tq->tq_task.task_next) == &tq->tq_task) {
if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
if (--tq->tq_active == 0)
cv_broadcast(&tq->tq_wait_cv);
cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
tq->tq_active++;
continue;
}
t->task_prev->task_next = t->task_next;
t->task_next->task_prev = t->task_prev;
t->tqent_prev->tqent_next = t->tqent_next;
t->tqent_next->tqent_prev = t->tqent_prev;
t->tqent_next = NULL;
t->tqent_prev = NULL;
prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
mutex_exit(&tq->tq_lock);

rw_enter(&tq->tq_threadlock, RW_READER);
t->task_func(t->task_arg);
t->tqent_func(t->tqent_arg);
rw_exit(&tq->tq_threadlock);

mutex_enter(&tq->tq_lock);
task_free(tq, t);
if (!prealloc)
task_free(tq, t);
}
tq->tq_nthreads--;
cv_broadcast(&tq->tq_wait_cv);
Expand Down Expand Up @@ -217,8 +250,8 @@ taskq_create(const char *name, int nthreads, pri_t pri,
tq->tq_nthreads = nthreads;
tq->tq_minalloc = minalloc;
tq->tq_maxalloc = maxalloc;
tq->tq_task.task_next = &tq->tq_task;
tq->tq_task.task_prev = &tq->tq_task;
tq->tq_task.tqent_next = &tq->tq_task;
tq->tq_task.tqent_prev = &tq->tq_task;
tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);

if (flags & TASKQ_PREPOPULATE) {
Expand Down
5 changes: 4 additions & 1 deletion usr/src/uts/common/fs/zfs/spa.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
/*
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
*/
/*
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
*/

/*
* This file contains all the routines used when modifying on-disk SPA state.
Expand Down Expand Up @@ -610,7 +613,7 @@ static taskq_t *
spa_taskq_create(spa_t *spa, const char *name, enum zti_modes mode,
uint_t value)
{
uint_t flags = TASKQ_PREPOPULATE;
uint_t flags = 0;
boolean_t batch = B_FALSE;

switch (mode) {
Expand Down
4 changes: 4 additions & 0 deletions usr/src/uts/common/fs/zfs/sys/zfs_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
* Copyright 2009 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
/*
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
*/

#ifndef _SYS_ZFS_CONTEXT_H
#define _SYS_ZFS_CONTEXT_H
Expand All @@ -39,6 +42,7 @@ extern "C" {
#include <sys/cmn_err.h>
#include <sys/kmem.h>
#include <sys/taskq.h>
#include <sys/taskq_impl.h>
#include <sys/buf.h>
#include <sys/param.h>
#include <sys/systm.h>
Expand Down
6 changes: 6 additions & 0 deletions usr/src/uts/common/fs/zfs/sys/zio.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
/*
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
*/
/*
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
*/

#ifndef _ZIO_H
#define _ZIO_H
Expand Down Expand Up @@ -417,6 +420,9 @@ struct zio {
/* FMA state */
zio_cksum_report_t *io_cksum_report;
uint64_t io_ena;

/* Taskq dispatching state */
taskq_ent_t io_tqent;
};

extern zio_t *zio_null(zio_t *pio, spa_t *spa, vdev_t *vd,
Expand Down
20 changes: 15 additions & 5 deletions usr/src/uts/common/fs/zfs/zio.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
/*
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2011 by Delphix. All rights reserved.
* Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
*/

#include <sys/zfs_context.h>
Expand Down Expand Up @@ -1061,7 +1062,7 @@ zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q, boolean_t cutinline)
{
spa_t *spa = zio->io_spa;
zio_type_t t = zio->io_type;
int flags = TQ_SLEEP | (cutinline ? TQ_FRONT : 0);
int flags = (cutinline ? TQ_FRONT : 0);

/*
* If we're a config writer or a probe, the normal issue and
Expand All @@ -1085,8 +1086,15 @@ zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q, boolean_t cutinline)
q++;

ASSERT3U(q, <, ZIO_TASKQ_TYPES);
(void) taskq_dispatch(spa->spa_zio_taskq[t][q],
(task_func_t *)zio_execute, zio, flags);

/*
* NB: We are assuming that the zio can only be dispatched
* to a single taskq at a time. It would be a grievous error
* to dispatch the zio to another taskq at the same time.
*/
ASSERT(zio->io_tqent.tqent_next == NULL);
taskq_dispatch_ent(spa->spa_zio_taskq[t][q],
(task_func_t *)zio_execute, zio, flags, &zio->io_tqent);
}

static boolean_t
Expand Down Expand Up @@ -2889,9 +2897,11 @@ zio_done(zio_t *zio)
* Reexecution is potentially a huge amount of work.
* Hand it off to the otherwise-unused claim taskq.
*/
(void) taskq_dispatch(
ASSERT(zio->io_tqent.tqent_next == NULL);
(void) taskq_dispatch_ent(
spa->spa_zio_taskq[ZIO_TYPE_CLAIM][ZIO_TASKQ_ISSUE],
(task_func_t *)zio_reexecute, zio, TQ_SLEEP);
(task_func_t *)zio_reexecute, zio, 0,
&zio->io_tqent);
}
return (ZIO_PIPELINE_STOP);
}
Expand Down
Loading

0 comments on commit 5aeb947

Please sign in to comment.