Skip to content

Commit

Permalink
Windows: add taskq_create_synced()
Browse files Browse the repository at this point in the history
Signed-off-by: Jorgen Lundman <[email protected]>
  • Loading branch information
lundman authored and datacore-rm committed Feb 28, 2024
1 parent afc95ef commit 3160047
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
2 changes: 2 additions & 0 deletions include/os/windows/spl/sys/taskq.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ extern taskq_t *taskq_create_proc(const char *, int, pri_t, int, int,
proc_t *, uint_t);
extern taskq_t *taskq_create_sysdc(const char *, int, int, int,
proc_t *, uint_t, uint_t);
extern taskq_t *taskq_create_synced(const char *, int, pri_t, int, int, uint_t,
kthread_t ***);
extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
extern void nulltask(void *);
extern void taskq_destroy(taskq_t *);
Expand Down
79 changes: 79 additions & 0 deletions module/os/windows/spl/spl-taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -2795,6 +2795,85 @@ taskq_bucket_extend(void *arg)
#endif /* __APPLE__ */
}

typedef struct taskq_sync_arg {
kthread_t *tqa_thread;
kcondvar_t tqa_cv;
kmutex_t tqa_lock;
int tqa_ready;
} taskq_sync_arg_t;

static void
taskq_sync_assign(void *arg)
{
taskq_sync_arg_t *tqa = arg;

mutex_enter(&tqa->tqa_lock);
tqa->tqa_thread = curthread;
tqa->tqa_ready = 1;
cv_signal(&tqa->tqa_cv);
while (tqa->tqa_ready == 1)
cv_wait(&tqa->tqa_cv, &tqa->tqa_lock);
mutex_exit(&tqa->tqa_lock);
}

/*
* Create a taskq with a specified number of pool threads. Allocate
* and return an array of nthreads kthread_t pointers, one for each
* thread in the pool. The array is not ordered and must be freed
* by the caller.
*/
taskq_t *
taskq_create_synced(const char *name, int nthreads, pri_t pri,
int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
{
taskq_t *tq;
taskq_sync_arg_t *tqs = kmem_zalloc(sizeof (*tqs) * nthreads, KM_SLEEP);
kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
KM_SLEEP);

flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);

tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
flags | TASKQ_PREPOPULATE);
VERIFY(tq != NULL);
VERIFY(tq->tq_nthreads == nthreads);

/* spawn all syncthreads */
for (int i = 0; i < nthreads; i++) {
cv_init(&tqs[i].tqa_cv, NULL, CV_DEFAULT, NULL);
mutex_init(&tqs[i].tqa_lock, NULL, MUTEX_DEFAULT, NULL);
(void) taskq_dispatch(tq, taskq_sync_assign,
&tqs[i], TQ_FRONT);
}

/* wait on all syncthreads to start */
for (int i = 0; i < nthreads; i++) {
mutex_enter(&tqs[i].tqa_lock);
while (tqs[i].tqa_ready == 0)
cv_wait(&tqs[i].tqa_cv, &tqs[i].tqa_lock);
mutex_exit(&tqs[i].tqa_lock);
}

/* let all syncthreads resume, finish */
for (int i = 0; i < nthreads; i++) {
mutex_enter(&tqs[i].tqa_lock);
tqs[i].tqa_ready = 2;
cv_broadcast(&tqs[i].tqa_cv);
mutex_exit(&tqs[i].tqa_lock);
}
taskq_wait(tq);

for (int i = 0; i < nthreads; i++) {
kthreads[i] = tqs[i].tqa_thread;
mutex_destroy(&tqs[i].tqa_lock);
cv_destroy(&tqs[i].tqa_cv);
}
kmem_free(tqs, sizeof (*tqs) * nthreads);

*ktpp = kthreads;
return (tq);
}

static int
taskq_kstat_update(kstat_t *ksp, int rw)
{
Expand Down

0 comments on commit 3160047

Please sign in to comment.