diff --git a/include/os/windows/spl/sys/taskq.h b/include/os/windows/spl/sys/taskq.h index 8dccbccdc2a7..db9fe42f3026 100644 --- a/include/os/windows/spl/sys/taskq.h +++ b/include/os/windows/spl/sys/taskq.h @@ -91,6 +91,8 @@ extern taskq_t *taskq_create_proc(const char *, int, pri_t, int, int, proc_t *, uint_t); extern taskq_t *taskq_create_sysdc(const char *, int, int, int, proc_t *, uint_t, uint_t); +extern taskq_t *taskq_create_synced(const char *, int, pri_t, int, int, uint_t, + kthread_t ***); extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); extern void nulltask(void *); extern void taskq_destroy(taskq_t *); diff --git a/module/os/windows/spl/spl-taskq.c b/module/os/windows/spl/spl-taskq.c index b307fe211808..47a72ae3ffc0 100644 --- a/module/os/windows/spl/spl-taskq.c +++ b/module/os/windows/spl/spl-taskq.c @@ -2795,6 +2795,85 @@ taskq_bucket_extend(void *arg) #endif /* __APPLE__ */ } +typedef struct taskq_sync_arg { + kthread_t *tqa_thread; + kcondvar_t tqa_cv; + kmutex_t tqa_lock; + int tqa_ready; +} taskq_sync_arg_t; + +static void +taskq_sync_assign(void *arg) +{ + taskq_sync_arg_t *tqa = arg; + + mutex_enter(&tqa->tqa_lock); + tqa->tqa_thread = curthread; + tqa->tqa_ready = 1; + cv_signal(&tqa->tqa_cv); + while (tqa->tqa_ready == 1) + cv_wait(&tqa->tqa_cv, &tqa->tqa_lock); + mutex_exit(&tqa->tqa_lock); +} + +/* + * Create a taskq with a specified number of pool threads. Allocate + * and return an array of nthreads kthread_t pointers, one for each + * thread in the pool. The array is not ordered and must be freed + * by the caller. + */ +taskq_t * +taskq_create_synced(const char *name, int nthreads, pri_t pri, + int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp) +{ + taskq_t *tq; + taskq_sync_arg_t *tqs = kmem_zalloc(sizeof (*tqs) * nthreads, KM_SLEEP); + kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads, + KM_SLEEP); + + flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH); + + tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX, + flags | TASKQ_PREPOPULATE); + VERIFY(tq != NULL); + VERIFY(tq->tq_nthreads == nthreads); + + /* spawn all syncthreads */ + for (int i = 0; i < nthreads; i++) { + cv_init(&tqs[i].tqa_cv, NULL, CV_DEFAULT, NULL); + mutex_init(&tqs[i].tqa_lock, NULL, MUTEX_DEFAULT, NULL); + (void) taskq_dispatch(tq, taskq_sync_assign, + &tqs[i], TQ_FRONT); + } + + /* wait on all syncthreads to start */ + for (int i = 0; i < nthreads; i++) { + mutex_enter(&tqs[i].tqa_lock); + while (tqs[i].tqa_ready == 0) + cv_wait(&tqs[i].tqa_cv, &tqs[i].tqa_lock); + mutex_exit(&tqs[i].tqa_lock); + } + + /* let all syncthreads resume, finish */ + for (int i = 0; i < nthreads; i++) { + mutex_enter(&tqs[i].tqa_lock); + tqs[i].tqa_ready = 2; + cv_broadcast(&tqs[i].tqa_cv); + mutex_exit(&tqs[i].tqa_lock); + } + taskq_wait(tq); + + for (int i = 0; i < nthreads; i++) { + kthreads[i] = tqs[i].tqa_thread; + mutex_destroy(&tqs[i].tqa_lock); + cv_destroy(&tqs[i].tqa_cv); + } + kmem_free(tqs, sizeof (*tqs) * nthreads); + + *ktpp = kthreads; + return (tq); +} + static int taskq_kstat_update(kstat_t *ksp, int rw) {