diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 2c437f0e..1c91c9a3 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -61,11 +61,12 @@ typedef void (task_func_t)(void *); typedef struct taskq { spinlock_t tq_lock; /* protects taskq_t */ unsigned long tq_lock_flags; /* interrupt state */ - const char *tq_name; /* taskq name */ + char *tq_name; /* taskq name */ struct list_head tq_thread_list;/* list of all threads */ struct list_head tq_active_list;/* list of active threads */ int tq_nactive; /* # of active threads */ - int tq_nthreads; /* # of total threads */ + int tq_nthreads; /* # of existing threads */ + int tq_maxthreads; /* max # of threads */ int tq_pri; /* priority */ int tq_minalloc; /* min task_t pool size */ int tq_maxalloc; /* max task_t pool size */ diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 49bb40a2..7e8c36d9 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -31,10 +31,22 @@ int spl_taskq_thread_bind = 0; module_param(spl_taskq_thread_bind, int, 0644); MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default"); + +int spl_taskq_thread_dynamic = 1; +module_param(spl_taskq_thread_dynamic, int, 0644); +MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads"); + +int spl_taskq_thread_sequential = 8; +module_param(spl_taskq_thread_sequential, int, 0644); +MODULE_PARM_DESC(spl_taskq_thread_sequential, + "Create new taskq threads after N sequential tasks"); + /* Global system-wide dynamic task queue available for all consumers */ taskq_t *system_taskq; EXPORT_SYMBOL(system_taskq); +static taskq_thread_t *taskq_thread_create(taskq_t *); + static int task_km_flags(uint_t flags) { @@ -434,17 +446,22 @@ taskq_member(taskq_t *tq, void *t) { struct list_head *l; taskq_thread_t *tqt; + int found = 0; ASSERT(tq); ASSERT(t); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); list_for_each(l, &tq->tq_thread_list) { tqt = list_entry(l, taskq_thread_t, tqt_thread_list); - if (tqt->tqt_thread == (struct task_struct *)t) - return (1); + if (tqt->tqt_thread == (struct task_struct *)t) { + found = 1; + break; + } } + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - return (0); + return (found); } EXPORT_SYMBOL(taskq_member); @@ -604,7 +621,6 @@ taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, { ASSERT(tq); ASSERT(func); - ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); @@ -673,6 +689,7 @@ taskq_thread(void *args) taskq_t *tq; taskq_ent_t *t; struct list_head *pend_list; + int seq_tasks = 0; ASSERT(tqt); tq = tqt->tqt_tq; @@ -683,7 +700,13 @@ taskq_thread(void *args) flush_signals(current); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Immediately exit if more threads than allowed were created.*/ + if (tq->tq_nthreads >= tq->tq_maxthreads) + goto error; + tq->tq_nthreads++; + list_add(&tqt->tqt_thread_list, &tq->tq_thread_list); wake_up(&tq->tq_wait_waitq); set_current_state(TASK_INTERRUPTIBLE); @@ -691,16 +714,27 @@ taskq_thread(void *args) if (list_empty(&tq->tq_pend_list) && list_empty(&tq->tq_prio_list)) { + + /* + * Dynamic taskqs will destroy threads which would + * otherwise be idle, they are recreated as needed. + */ + if ((tq->tq_flags & TASKQ_DYNAMIC) && + (tq->tq_nthreads > 1) && spl_taskq_thread_dynamic) + break; + add_wait_queue_exclusive(&tq->tq_work_waitq, &wait); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + seq_tasks = 0; schedule(); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); remove_wait_queue(&tq->tq_work_waitq, &wait); } else { __set_current_state(TASK_RUNNING); } - if (!list_empty(&tq->tq_prio_list)) pend_list = &tq->tq_prio_list; else if (!list_empty(&tq->tq_pend_list)) @@ -732,14 +766,27 @@ taskq_thread(void *args) /* Perform the requested task */ t->tqent_func(t->tqent_arg); + /* + * Dynamic taskq's will create additional threads up + * to tq_maxthreads when a single thread has handled + * more than spl_taskq_thread_sequential tasks without + * sleeping due to an empty queue. + */ + if ((++seq_tasks > spl_taskq_thread_sequential) && + (tq->tq_flags & TASKQ_DYNAMIC) && + (tq->tq_flags & TQ_ACTIVE) && + (tq->tq_nthreads < tq->tq_maxthreads)) { + taskq_thread_create(tq); + seq_tasks = 0; + } + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); tq->tq_nactive--; list_del_init(&tqt->tqt_active_list); tqt->tqt_task = NULL; /* For prealloc'd tasks, we don't free anything. */ - if ((tq->tq_flags & TASKQ_DYNAMIC) || - !(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) + if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) task_done(tq, t); /* When the current lowest outstanding taskqid is @@ -761,27 +808,56 @@ taskq_thread(void *args) __set_current_state(TASK_RUNNING); tq->tq_nthreads--; list_del_init(&tqt->tqt_thread_list); +error: kmem_free(tqt, sizeof(taskq_thread_t)); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); return (0); } +static taskq_thread_t * +taskq_thread_create(taskq_t *tq) +{ + static int last_used_cpu = 0; + taskq_thread_t *tqt; + + tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE); + INIT_LIST_HEAD(&tqt->tqt_thread_list); + INIT_LIST_HEAD(&tqt->tqt_active_list); + tqt->tqt_tq = tq; + tqt->tqt_id = 0; + + tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, + "%s", tq->tq_name); + if (tqt->tqt_thread == NULL) { + kmem_free(tqt, sizeof (taskq_thread_t)); + return (NULL); + } + + if (spl_taskq_thread_bind) { + last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); + kthread_bind(tqt->tqt_thread, last_used_cpu); + } + + set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri)); + wake_up_process(tqt->tqt_thread); + + return (tqt); +} + taskq_t * taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, int maxalloc, uint_t flags) { - static int last_used_cpu = 0; taskq_t *tq; taskq_thread_t *tqt; - int rc = 0, i, j = 0; + int count = 0, rc = 0, i; ASSERT(name != NULL); ASSERT(pri <= maxclsyspri); ASSERT(minalloc >= 0); ASSERT(maxalloc <= INT_MAX); - ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ + ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */ /* Scale the number of threads using nthreads as a percentage */ if (flags & TASKQ_THREADS_CPU_PCT) { @@ -800,16 +876,17 @@ taskq_create(const char *name, int nthreads, pri_t pri, spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); INIT_LIST_HEAD(&tq->tq_thread_list); INIT_LIST_HEAD(&tq->tq_active_list); - tq->tq_name = name; - tq->tq_nactive = 0; - tq->tq_nthreads = 0; - tq->tq_pri = pri; - tq->tq_minalloc = minalloc; - tq->tq_maxalloc = maxalloc; - tq->tq_nalloc = 0; - tq->tq_flags = (flags | TQ_ACTIVE); - tq->tq_next_id = 1; - tq->tq_lowest_id = 1; + tq->tq_name = strdup(name); + tq->tq_nactive = 0; + tq->tq_nthreads = 0; + tq->tq_maxthreads = nthreads; + tq->tq_pri = pri; + tq->tq_minalloc = minalloc; + tq->tq_maxalloc = maxalloc; + tq->tq_nalloc = 0; + tq->tq_flags = (flags | TQ_ACTIVE); + tq->tq_next_id = 1; + tq->tq_lowest_id = 1; INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_prio_list); @@ -823,32 +900,19 @@ taskq_create(const char *name, int nthreads, pri_t pri, spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) + nthreads = 1; + for (i = 0; i < nthreads; i++) { - tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE); - INIT_LIST_HEAD(&tqt->tqt_thread_list); - INIT_LIST_HEAD(&tqt->tqt_active_list); - tqt->tqt_tq = tq; - tqt->tqt_id = 0; - - tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, - "%s/%d", name, i); - if (tqt->tqt_thread) { - list_add(&tqt->tqt_thread_list, &tq->tq_thread_list); - if (spl_taskq_thread_bind) { - last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); - kthread_bind(tqt->tqt_thread, last_used_cpu); - } - set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri)); - wake_up_process(tqt->tqt_thread); - j++; - } else { - kmem_free(tqt, sizeof(taskq_thread_t)); + tqt = taskq_thread_create(tq); + if (tqt == NULL) rc = 1; - } + else + count++; } /* Wait for all threads to be started before potential destroy */ - wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); + wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count); if (rc) { taskq_destroy(tq); @@ -913,6 +977,7 @@ taskq_destroy(taskq_t *tq) spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + strfree(tq->tq_name); kmem_free(tq, sizeof(taskq_t)); } EXPORT_SYMBOL(taskq_destroy); diff --git a/module/splat/splat-taskq.c b/module/splat/splat-taskq.c index 7d4ad5b6..529d0705 100644 --- a/module/splat/splat-taskq.c +++ b/module/splat/splat-taskq.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include "splat-internal.h" @@ -75,6 +76,10 @@ #define SPLAT_TASKQ_TEST10_NAME "cancel" #define SPLAT_TASKQ_TEST10_DESC "Cancel task execution" +#define SPLAT_TASKQ_TEST11_ID 0x020b +#define SPLAT_TASKQ_TEST11_NAME "dynamic" +#define SPLAT_TASKQ_TEST11_DESC "Dynamic task queue thread creation" + #define SPLAT_TASKQ_ORDER_MAX 8 #define SPLAT_TASKQ_DEPTH_MAX 16 @@ -1052,21 +1057,15 @@ splat_taskq_test7(struct file *file, void *arg) rc = splat_taskq_test7_impl(file, arg, B_FALSE); if (rc) - return rc; + return (rc); rc = splat_taskq_test7_impl(file, arg, B_TRUE); - return rc; + return (rc); } -/* - * Create a taskq with 100 threads and dispatch a huge number of trivial - * tasks to generate contention on tq->tq_lock. This test should always - * pass. The purpose is to provide a benchmark for measuring the - * effectiveness of taskq optimizations. - */ static void -splat_taskq_test8_func(void *arg) +splat_taskq_throughput_func(void *arg) { splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; ASSERT(tq_arg); @@ -1074,98 +1073,105 @@ splat_taskq_test8_func(void *arg) atomic_inc(tq_arg->count); } -#define TEST8_NUM_TASKS 0x20000 -#define TEST8_THREADS_PER_TASKQ 100 - static int -splat_taskq_test8_common(struct file *file, void *arg, int minalloc, - int maxalloc) +splat_taskq_throughput(struct file *file, void *arg, const char *name, + int nthreads, int minalloc, int maxalloc, int flags, int tasks, + struct timespec *delta) { taskq_t *tq; taskqid_t id; splat_taskq_arg_t tq_arg; taskq_ent_t **tqes; atomic_t count; + struct timespec start, stop; int i, j, rc = 0; - tqes = vmalloc(sizeof(*tqes) * TEST8_NUM_TASKS); + tqes = vmalloc(sizeof (*tqes) * tasks); if (tqes == NULL) - return -ENOMEM; - memset(tqes, 0, sizeof(*tqes) * TEST8_NUM_TASKS); - - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, - "Taskq '%s' creating (%d/%d/%d)\n", - SPLAT_TASKQ_TEST8_NAME, - minalloc, maxalloc, TEST8_NUM_TASKS); - if ((tq = taskq_create(SPLAT_TASKQ_TEST8_NAME, TEST8_THREADS_PER_TASKQ, - maxclsyspri, minalloc, maxalloc, - TASKQ_PREPOPULATE)) == NULL) { - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, - "Taskq '%s' create failed\n", - SPLAT_TASKQ_TEST8_NAME); + return (-ENOMEM); + + memset(tqes, 0, sizeof (*tqes) * tasks); + + splat_vprint(file, name, "Taskq '%s' creating (%d/%d/%d/%d)\n", + name, nthreads, minalloc, maxalloc, tasks); + if ((tq = taskq_create(name, nthreads, maxclsyspri, + minalloc, maxalloc, flags)) == NULL) { + splat_vprint(file, name, "Taskq '%s' create failed\n", name); rc = -EINVAL; goto out_free; } tq_arg.file = file; - tq_arg.name = SPLAT_TASKQ_TEST8_NAME; + tq_arg.name = name; tq_arg.count = &count; atomic_set(tq_arg.count, 0); - for (i = 0; i < TEST8_NUM_TASKS; i++) { - tqes[i] = kmalloc(sizeof(taskq_ent_t), GFP_KERNEL); + getnstimeofday(&start); + + for (i = 0; i < tasks; i++) { + tqes[i] = kmalloc(sizeof (taskq_ent_t), GFP_KERNEL); if (tqes[i] == NULL) { rc = -ENOMEM; goto out; } - taskq_init_ent(tqes[i]); - - taskq_dispatch_ent(tq, splat_taskq_test8_func, - &tq_arg, TQ_SLEEP, tqes[i]); + taskq_init_ent(tqes[i]); + taskq_dispatch_ent(tq, splat_taskq_throughput_func, + &tq_arg, TQ_SLEEP, tqes[i]); id = tqes[i]->tqent_id; if (id == 0) { - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, - "Taskq '%s' function '%s' dispatch " - "%d failed\n", tq_arg.name, - sym2str(splat_taskq_test8_func), i); - rc = -EINVAL; - goto out; + splat_vprint(file, name, "Taskq '%s' function '%s' " + "dispatch %d failed\n", tq_arg.name, + sym2str(splat_taskq_throughput_func), i); + rc = -EINVAL; + goto out; } } - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' " - "waiting for %d dispatches\n", tq_arg.name, - TEST8_NUM_TASKS); + splat_vprint(file, name, "Taskq '%s' waiting for %d dispatches\n", + tq_arg.name, tasks); + taskq_wait(tq); - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' " - "%d/%d dispatches finished\n", tq_arg.name, - atomic_read(tq_arg.count), TEST8_NUM_TASKS); - if (atomic_read(tq_arg.count) != TEST8_NUM_TASKS) + if (delta != NULL) { + getnstimeofday(&stop); + *delta = timespec_sub(stop, start); + } + + splat_vprint(file, name, "Taskq '%s' %d/%d dispatches finished\n", + tq_arg.name, atomic_read(tq_arg.count), tasks); + + if (atomic_read(tq_arg.count) != tasks) rc = -ERANGE; out: - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' destroying\n", - tq_arg.name); + splat_vprint(file, name, "Taskq '%s' destroying\n", tq_arg.name); taskq_destroy(tq); out_free: - for (j = 0; j < TEST8_NUM_TASKS && tqes[j] != NULL; j++) + for (j = 0; j < tasks && tqes[j] != NULL; j++) kfree(tqes[j]); + vfree(tqes); - return rc; + return (rc); } +/* + * Create a taskq with 100 threads and dispatch a huge number of trivial + * tasks to generate contention on tq->tq_lock. This test should always + * pass. The purpose is to provide a benchmark for measuring the + * effectiveness of taskq optimizations. + */ +#define TEST8_NUM_TASKS 0x20000 +#define TEST8_THREADS_PER_TASKQ 100 + static int splat_taskq_test8(struct file *file, void *arg) { - int rc; - - rc = splat_taskq_test8_common(file, arg, 1, 100); - - return rc; + return (splat_taskq_throughput(file, arg, + SPLAT_TASKQ_TEST8_NAME, TEST8_THREADS_PER_TASKQ, + 1, INT_MAX, TASKQ_PREPOPULATE, TEST8_NUM_TASKS, NULL)); } /* @@ -1433,6 +1439,46 @@ splat_taskq_test10(struct file *file, void *arg) return rc; } +/* + * Create a dynamic taskq with 100 threads and dispatch a huge number of + * trivial tasks. This will cause the taskq to grow quickly to its max + * thread count. This test should always pass. The purpose is to provide + * a benchmark for measuring the performance of dynamic taskqs. + */ +#define TEST11_NUM_TASKS 100000 +#define TEST11_THREADS_PER_TASKQ 100 + +static int +splat_taskq_test11(struct file *file, void *arg) +{ + struct timespec normal, dynamic; + int error; + + error = splat_taskq_throughput(file, arg, SPLAT_TASKQ_TEST11_NAME, + TEST11_THREADS_PER_TASKQ, 1, INT_MAX, + TASKQ_PREPOPULATE, TEST11_NUM_TASKS, &normal); + if (error) + return (error); + + error = splat_taskq_throughput(file, arg, SPLAT_TASKQ_TEST11_NAME, + TEST11_THREADS_PER_TASKQ, 1, INT_MAX, + TASKQ_PREPOPULATE | TASKQ_DYNAMIC, TEST11_NUM_TASKS, &dynamic); + if (error) + return (error); + + splat_vprint(file, SPLAT_TASKQ_TEST11_NAME, + "Timing taskq_wait(): normal=%ld.%09lds, dynamic=%ld.%09lds\n", + normal.tv_sec, normal.tv_nsec, + dynamic.tv_sec, dynamic.tv_nsec); + + /* A 1% increase in runtime is a */ + if ((dynamic.tv_sec * NANOSEC + dynamic.tv_nsec) > + ((normal.tv_sec * NANOSEC + normal.tv_nsec) * 101 / 100)) + error = -ETIME; + + return (error); +} + splat_subsystem_t * splat_taskq_init(void) { @@ -1470,6 +1516,8 @@ splat_taskq_init(void) SPLAT_TASKQ_TEST9_ID, splat_taskq_test9); SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST10_NAME, SPLAT_TASKQ_TEST10_DESC, SPLAT_TASKQ_TEST10_ID, splat_taskq_test10); + SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST11_NAME, SPLAT_TASKQ_TEST11_DESC, + SPLAT_TASKQ_TEST11_ID, splat_taskq_test11); return sub; } @@ -1478,6 +1526,7 @@ void splat_taskq_fini(splat_subsystem_t *sub) { ASSERT(sub); + SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST11_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST10_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST9_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST8_ID);