From b355c17c420d5efeab503585d88d9c9063da70d1 Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Wed, 3 Jun 2015 17:21:16 -0700 Subject: [PATCH] Add TASKQ_DYNAMIC feature Setting the TASKQ_DYNAMIC flag will create a taskq with dynamic semantics. Initially only a single worker thread will be created to service tasks dispatched to the queue. As additional threads are needed they will be dynamically spawned up to the max number specified by 'nthreads'. When the threads are no longer needed, because the taskq is empty, they will automatically terminate. Due to the low cost of creating and destroying threads under Linux by default new threads and spawned and terminated aggressively. There are two modules options which can be tuned to adjust this behavior if needed. * spl_taskq_thread_sequential - The number of sequential tasks, without interruption, which needed to be handled by a worker thread before a new worker thread is spawned. Default 8. * spl_taskq_thread_dynamic - Provides the ability to completely disable the use of dynamic taskqs on the system. This is provided for the purposes of debugging and troubleshooting. Default 1 (enabled). This behavior is consistent with the dynamic taskq implemenation found in both illumos and FreeBSD. Signed-off-by: Brian Behlendorf --- include/sys/taskq.h | 5 +- module/spl/spl-taskq.c | 149 +++++++++++++++++++++++---------- module/splat/splat-taskq.c | 163 ++++++++++++++++++++++++------------- 3 files changed, 216 insertions(+), 101 deletions(-) diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 2c437f0e..1c91c9a3 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -61,11 +61,12 @@ typedef void (task_func_t)(void *); typedef struct taskq { spinlock_t tq_lock; /* protects taskq_t */ unsigned long tq_lock_flags; /* interrupt state */ - const char *tq_name; /* taskq name */ + char *tq_name; /* taskq name */ struct list_head tq_thread_list;/* list of all threads */ struct list_head tq_active_list;/* list of active threads */ int tq_nactive; /* # of active threads */ - int tq_nthreads; /* # of total threads */ + int tq_nthreads; /* # of existing threads */ + int tq_maxthreads; /* max # of threads */ int tq_pri; /* priority */ int tq_minalloc; /* min task_t pool size */ int tq_maxalloc; /* max task_t pool size */ diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 49bb40a2..7e8c36d9 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -31,10 +31,22 @@ int spl_taskq_thread_bind = 0; module_param(spl_taskq_thread_bind, int, 0644); MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default"); + +int spl_taskq_thread_dynamic = 1; +module_param(spl_taskq_thread_dynamic, int, 0644); +MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads"); + +int spl_taskq_thread_sequential = 8; +module_param(spl_taskq_thread_sequential, int, 0644); +MODULE_PARM_DESC(spl_taskq_thread_sequential, + "Create new taskq threads after N sequential tasks"); + /* Global system-wide dynamic task queue available for all consumers */ taskq_t *system_taskq; EXPORT_SYMBOL(system_taskq); +static taskq_thread_t *taskq_thread_create(taskq_t *); + static int task_km_flags(uint_t flags) { @@ -434,17 +446,22 @@ taskq_member(taskq_t *tq, void *t) { struct list_head *l; taskq_thread_t *tqt; + int found = 0; ASSERT(tq); ASSERT(t); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); list_for_each(l, &tq->tq_thread_list) { tqt = list_entry(l, taskq_thread_t, tqt_thread_list); - if (tqt->tqt_thread == (struct task_struct *)t) - return (1); + if (tqt->tqt_thread == (struct task_struct *)t) { + found = 1; + break; + } } + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - return (0); + return (found); } EXPORT_SYMBOL(taskq_member); @@ -604,7 +621,6 @@ taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, { ASSERT(tq); ASSERT(func); - ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); @@ -673,6 +689,7 @@ taskq_thread(void *args) taskq_t *tq; taskq_ent_t *t; struct list_head *pend_list; + int seq_tasks = 0; ASSERT(tqt); tq = tqt->tqt_tq; @@ -683,7 +700,13 @@ taskq_thread(void *args) flush_signals(current); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Immediately exit if more threads than allowed were created.*/ + if (tq->tq_nthreads >= tq->tq_maxthreads) + goto error; + tq->tq_nthreads++; + list_add(&tqt->tqt_thread_list, &tq->tq_thread_list); wake_up(&tq->tq_wait_waitq); set_current_state(TASK_INTERRUPTIBLE); @@ -691,16 +714,27 @@ taskq_thread(void *args) if (list_empty(&tq->tq_pend_list) && list_empty(&tq->tq_prio_list)) { + + /* + * Dynamic taskqs will destroy threads which would + * otherwise be idle, they are recreated as needed. + */ + if ((tq->tq_flags & TASKQ_DYNAMIC) && + (tq->tq_nthreads > 1) && spl_taskq_thread_dynamic) + break; + add_wait_queue_exclusive(&tq->tq_work_waitq, &wait); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + seq_tasks = 0; schedule(); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); remove_wait_queue(&tq->tq_work_waitq, &wait); } else { __set_current_state(TASK_RUNNING); } - if (!list_empty(&tq->tq_prio_list)) pend_list = &tq->tq_prio_list; else if (!list_empty(&tq->tq_pend_list)) @@ -732,14 +766,27 @@ taskq_thread(void *args) /* Perform the requested task */ t->tqent_func(t->tqent_arg); + /* + * Dynamic taskq's will create additional threads up + * to tq_maxthreads when a single thread has handled + * more than spl_taskq_thread_sequential tasks without + * sleeping due to an empty queue. + */ + if ((++seq_tasks > spl_taskq_thread_sequential) && + (tq->tq_flags & TASKQ_DYNAMIC) && + (tq->tq_flags & TQ_ACTIVE) && + (tq->tq_nthreads < tq->tq_maxthreads)) { + taskq_thread_create(tq); + seq_tasks = 0; + } + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); tq->tq_nactive--; list_del_init(&tqt->tqt_active_list); tqt->tqt_task = NULL; /* For prealloc'd tasks, we don't free anything. */ - if ((tq->tq_flags & TASKQ_DYNAMIC) || - !(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) + if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) task_done(tq, t); /* When the current lowest outstanding taskqid is @@ -761,27 +808,56 @@ taskq_thread(void *args) __set_current_state(TASK_RUNNING); tq->tq_nthreads--; list_del_init(&tqt->tqt_thread_list); +error: kmem_free(tqt, sizeof(taskq_thread_t)); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); return (0); } +static taskq_thread_t * +taskq_thread_create(taskq_t *tq) +{ + static int last_used_cpu = 0; + taskq_thread_t *tqt; + + tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE); + INIT_LIST_HEAD(&tqt->tqt_thread_list); + INIT_LIST_HEAD(&tqt->tqt_active_list); + tqt->tqt_tq = tq; + tqt->tqt_id = 0; + + tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, + "%s", tq->tq_name); + if (tqt->tqt_thread == NULL) { + kmem_free(tqt, sizeof (taskq_thread_t)); + return (NULL); + } + + if (spl_taskq_thread_bind) { + last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); + kthread_bind(tqt->tqt_thread, last_used_cpu); + } + + set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri)); + wake_up_process(tqt->tqt_thread); + + return (tqt); +} + taskq_t * taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, int maxalloc, uint_t flags) { - static int last_used_cpu = 0; taskq_t *tq; taskq_thread_t *tqt; - int rc = 0, i, j = 0; + int count = 0, rc = 0, i; ASSERT(name != NULL); ASSERT(pri <= maxclsyspri); ASSERT(minalloc >= 0); ASSERT(maxalloc <= INT_MAX); - ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ + ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */ /* Scale the number of threads using nthreads as a percentage */ if (flags & TASKQ_THREADS_CPU_PCT) { @@ -800,16 +876,17 @@ taskq_create(const char *name, int nthreads, pri_t pri, spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); INIT_LIST_HEAD(&tq->tq_thread_list); INIT_LIST_HEAD(&tq->tq_active_list); - tq->tq_name = name; - tq->tq_nactive = 0; - tq->tq_nthreads = 0; - tq->tq_pri = pri; - tq->tq_minalloc = minalloc; - tq->tq_maxalloc = maxalloc; - tq->tq_nalloc = 0; - tq->tq_flags = (flags | TQ_ACTIVE); - tq->tq_next_id = 1; - tq->tq_lowest_id = 1; + tq->tq_name = strdup(name); + tq->tq_nactive = 0; + tq->tq_nthreads = 0; + tq->tq_maxthreads = nthreads; + tq->tq_pri = pri; + tq->tq_minalloc = minalloc; + tq->tq_maxalloc = maxalloc; + tq->tq_nalloc = 0; + tq->tq_flags = (flags | TQ_ACTIVE); + tq->tq_next_id = 1; + tq->tq_lowest_id = 1; INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_prio_list); @@ -823,32 +900,19 @@ taskq_create(const char *name, int nthreads, pri_t pri, spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) + nthreads = 1; + for (i = 0; i < nthreads; i++) { - tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE); - INIT_LIST_HEAD(&tqt->tqt_thread_list); - INIT_LIST_HEAD(&tqt->tqt_active_list); - tqt->tqt_tq = tq; - tqt->tqt_id = 0; - - tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, - "%s/%d", name, i); - if (tqt->tqt_thread) { - list_add(&tqt->tqt_thread_list, &tq->tq_thread_list); - if (spl_taskq_thread_bind) { - last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); - kthread_bind(tqt->tqt_thread, last_used_cpu); - } - set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri)); - wake_up_process(tqt->tqt_thread); - j++; - } else { - kmem_free(tqt, sizeof(taskq_thread_t)); + tqt = taskq_thread_create(tq); + if (tqt == NULL) rc = 1; - } + else + count++; } /* Wait for all threads to be started before potential destroy */ - wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); + wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count); if (rc) { taskq_destroy(tq); @@ -913,6 +977,7 @@ taskq_destroy(taskq_t *tq) spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + strfree(tq->tq_name); kmem_free(tq, sizeof(taskq_t)); } EXPORT_SYMBOL(taskq_destroy); diff --git a/module/splat/splat-taskq.c b/module/splat/splat-taskq.c index 7d4ad5b6..529d0705 100644 --- a/module/splat/splat-taskq.c +++ b/module/splat/splat-taskq.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include "splat-internal.h" @@ -75,6 +76,10 @@ #define SPLAT_TASKQ_TEST10_NAME "cancel" #define SPLAT_TASKQ_TEST10_DESC "Cancel task execution" +#define SPLAT_TASKQ_TEST11_ID 0x020b +#define SPLAT_TASKQ_TEST11_NAME "dynamic" +#define SPLAT_TASKQ_TEST11_DESC "Dynamic task queue thread creation" + #define SPLAT_TASKQ_ORDER_MAX 8 #define SPLAT_TASKQ_DEPTH_MAX 16 @@ -1052,21 +1057,15 @@ splat_taskq_test7(struct file *file, void *arg) rc = splat_taskq_test7_impl(file, arg, B_FALSE); if (rc) - return rc; + return (rc); rc = splat_taskq_test7_impl(file, arg, B_TRUE); - return rc; + return (rc); } -/* - * Create a taskq with 100 threads and dispatch a huge number of trivial - * tasks to generate contention on tq->tq_lock. This test should always - * pass. The purpose is to provide a benchmark for measuring the - * effectiveness of taskq optimizations. - */ static void -splat_taskq_test8_func(void *arg) +splat_taskq_throughput_func(void *arg) { splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; ASSERT(tq_arg); @@ -1074,98 +1073,105 @@ splat_taskq_test8_func(void *arg) atomic_inc(tq_arg->count); } -#define TEST8_NUM_TASKS 0x20000 -#define TEST8_THREADS_PER_TASKQ 100 - static int -splat_taskq_test8_common(struct file *file, void *arg, int minalloc, - int maxalloc) +splat_taskq_throughput(struct file *file, void *arg, const char *name, + int nthreads, int minalloc, int maxalloc, int flags, int tasks, + struct timespec *delta) { taskq_t *tq; taskqid_t id; splat_taskq_arg_t tq_arg; taskq_ent_t **tqes; atomic_t count; + struct timespec start, stop; int i, j, rc = 0; - tqes = vmalloc(sizeof(*tqes) * TEST8_NUM_TASKS); + tqes = vmalloc(sizeof (*tqes) * tasks); if (tqes == NULL) - return -ENOMEM; - memset(tqes, 0, sizeof(*tqes) * TEST8_NUM_TASKS); - - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, - "Taskq '%s' creating (%d/%d/%d)\n", - SPLAT_TASKQ_TEST8_NAME, - minalloc, maxalloc, TEST8_NUM_TASKS); - if ((tq = taskq_create(SPLAT_TASKQ_TEST8_NAME, TEST8_THREADS_PER_TASKQ, - maxclsyspri, minalloc, maxalloc, - TASKQ_PREPOPULATE)) == NULL) { - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, - "Taskq '%s' create failed\n", - SPLAT_TASKQ_TEST8_NAME); + return (-ENOMEM); + + memset(tqes, 0, sizeof (*tqes) * tasks); + + splat_vprint(file, name, "Taskq '%s' creating (%d/%d/%d/%d)\n", + name, nthreads, minalloc, maxalloc, tasks); + if ((tq = taskq_create(name, nthreads, maxclsyspri, + minalloc, maxalloc, flags)) == NULL) { + splat_vprint(file, name, "Taskq '%s' create failed\n", name); rc = -EINVAL; goto out_free; } tq_arg.file = file; - tq_arg.name = SPLAT_TASKQ_TEST8_NAME; + tq_arg.name = name; tq_arg.count = &count; atomic_set(tq_arg.count, 0); - for (i = 0; i < TEST8_NUM_TASKS; i++) { - tqes[i] = kmalloc(sizeof(taskq_ent_t), GFP_KERNEL); + getnstimeofday(&start); + + for (i = 0; i < tasks; i++) { + tqes[i] = kmalloc(sizeof (taskq_ent_t), GFP_KERNEL); if (tqes[i] == NULL) { rc = -ENOMEM; goto out; } - taskq_init_ent(tqes[i]); - - taskq_dispatch_ent(tq, splat_taskq_test8_func, - &tq_arg, TQ_SLEEP, tqes[i]); + taskq_init_ent(tqes[i]); + taskq_dispatch_ent(tq, splat_taskq_throughput_func, + &tq_arg, TQ_SLEEP, tqes[i]); id = tqes[i]->tqent_id; if (id == 0) { - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, - "Taskq '%s' function '%s' dispatch " - "%d failed\n", tq_arg.name, - sym2str(splat_taskq_test8_func), i); - rc = -EINVAL; - goto out; + splat_vprint(file, name, "Taskq '%s' function '%s' " + "dispatch %d failed\n", tq_arg.name, + sym2str(splat_taskq_throughput_func), i); + rc = -EINVAL; + goto out; } } - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' " - "waiting for %d dispatches\n", tq_arg.name, - TEST8_NUM_TASKS); + splat_vprint(file, name, "Taskq '%s' waiting for %d dispatches\n", + tq_arg.name, tasks); + taskq_wait(tq); - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' " - "%d/%d dispatches finished\n", tq_arg.name, - atomic_read(tq_arg.count), TEST8_NUM_TASKS); - if (atomic_read(tq_arg.count) != TEST8_NUM_TASKS) + if (delta != NULL) { + getnstimeofday(&stop); + *delta = timespec_sub(stop, start); + } + + splat_vprint(file, name, "Taskq '%s' %d/%d dispatches finished\n", + tq_arg.name, atomic_read(tq_arg.count), tasks); + + if (atomic_read(tq_arg.count) != tasks) rc = -ERANGE; out: - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' destroying\n", - tq_arg.name); + splat_vprint(file, name, "Taskq '%s' destroying\n", tq_arg.name); taskq_destroy(tq); out_free: - for (j = 0; j < TEST8_NUM_TASKS && tqes[j] != NULL; j++) + for (j = 0; j < tasks && tqes[j] != NULL; j++) kfree(tqes[j]); + vfree(tqes); - return rc; + return (rc); } +/* + * Create a taskq with 100 threads and dispatch a huge number of trivial + * tasks to generate contention on tq->tq_lock. This test should always + * pass. The purpose is to provide a benchmark for measuring the + * effectiveness of taskq optimizations. + */ +#define TEST8_NUM_TASKS 0x20000 +#define TEST8_THREADS_PER_TASKQ 100 + static int splat_taskq_test8(struct file *file, void *arg) { - int rc; - - rc = splat_taskq_test8_common(file, arg, 1, 100); - - return rc; + return (splat_taskq_throughput(file, arg, + SPLAT_TASKQ_TEST8_NAME, TEST8_THREADS_PER_TASKQ, + 1, INT_MAX, TASKQ_PREPOPULATE, TEST8_NUM_TASKS, NULL)); } /* @@ -1433,6 +1439,46 @@ splat_taskq_test10(struct file *file, void *arg) return rc; } +/* + * Create a dynamic taskq with 100 threads and dispatch a huge number of + * trivial tasks. This will cause the taskq to grow quickly to its max + * thread count. This test should always pass. The purpose is to provide + * a benchmark for measuring the performance of dynamic taskqs. + */ +#define TEST11_NUM_TASKS 100000 +#define TEST11_THREADS_PER_TASKQ 100 + +static int +splat_taskq_test11(struct file *file, void *arg) +{ + struct timespec normal, dynamic; + int error; + + error = splat_taskq_throughput(file, arg, SPLAT_TASKQ_TEST11_NAME, + TEST11_THREADS_PER_TASKQ, 1, INT_MAX, + TASKQ_PREPOPULATE, TEST11_NUM_TASKS, &normal); + if (error) + return (error); + + error = splat_taskq_throughput(file, arg, SPLAT_TASKQ_TEST11_NAME, + TEST11_THREADS_PER_TASKQ, 1, INT_MAX, + TASKQ_PREPOPULATE | TASKQ_DYNAMIC, TEST11_NUM_TASKS, &dynamic); + if (error) + return (error); + + splat_vprint(file, SPLAT_TASKQ_TEST11_NAME, + "Timing taskq_wait(): normal=%ld.%09lds, dynamic=%ld.%09lds\n", + normal.tv_sec, normal.tv_nsec, + dynamic.tv_sec, dynamic.tv_nsec); + + /* A 1% increase in runtime is a */ + if ((dynamic.tv_sec * NANOSEC + dynamic.tv_nsec) > + ((normal.tv_sec * NANOSEC + normal.tv_nsec) * 101 / 100)) + error = -ETIME; + + return (error); +} + splat_subsystem_t * splat_taskq_init(void) { @@ -1470,6 +1516,8 @@ splat_taskq_init(void) SPLAT_TASKQ_TEST9_ID, splat_taskq_test9); SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST10_NAME, SPLAT_TASKQ_TEST10_DESC, SPLAT_TASKQ_TEST10_ID, splat_taskq_test10); + SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST11_NAME, SPLAT_TASKQ_TEST11_DESC, + SPLAT_TASKQ_TEST11_ID, splat_taskq_test11); return sub; } @@ -1478,6 +1526,7 @@ void splat_taskq_fini(splat_subsystem_t *sub) { ASSERT(sub); + SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST11_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST10_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST9_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST8_ID);