Skip to content

Commit

Permalink
Static sync thread and write issue taskq assignments
Browse files Browse the repository at this point in the history
 - Given a reasonable number of syncthreads, assign each
   syncthread its own allocator.

 - Create a separate write issue taskq for a given number of
   CPUS and statically bind assign each taskq to a specified
   syncthread.

Signed-off-by: Edmund Nadolski <[email protected]>
  • Loading branch information
Edmund Nadolski committed Oct 13, 2023
1 parent a9f251a commit 171f81e
Show file tree
Hide file tree
Showing 16 changed files with 543 additions and 63 deletions.
4 changes: 4 additions & 0 deletions include/os/freebsd/spl/sys/taskq.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ extern "C" {

typedef struct taskq {
struct taskqueue *tq_queue;
int tq_nthreads;
} taskq_t;

typedef uintptr_t taskqid_t;
Expand Down Expand Up @@ -93,13 +94,16 @@ extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t,
taskq_ent_t *);
extern int taskq_empty_ent(taskq_ent_t *);
taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t);
taskq_t *taskq_create_synced(const char *, int, pri_t, int, int, uint_t,
kthread_t ***);
taskq_t *taskq_create_instance(const char *, int, int, pri_t, int, int, uint_t);
taskq_t *taskq_create_proc(const char *, int, pri_t, int, int,
struct proc *, uint_t);
taskq_t *taskq_create_sysdc(const char *, int, int, int,
struct proc *, uint_t, uint_t);
void nulltask(void *);
extern void taskq_destroy(taskq_t *);
extern void taskq_destroy_synced(taskq_t *, kthread_t **);
extern void taskq_wait_id(taskq_t *, taskqid_t);
extern void taskq_wait_outstanding(taskq_t *, taskqid_t);
extern void taskq_wait(taskq_t *);
Expand Down
3 changes: 3 additions & 0 deletions include/os/linux/spl/sys/taskq.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t,
extern int taskq_empty_ent(taskq_ent_t *);
extern void taskq_init_ent(taskq_ent_t *);
extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t);
extern taskq_t *taskq_create_synced(const char *, int, pri_t, int, int, uint_t,
kthread_t ***);
extern void taskq_destroy(taskq_t *);
extern void taskq_destroy_synced(taskq_t *, kthread_t **);
extern void taskq_wait_id(taskq_t *, taskqid_t);
extern void taskq_wait_outstanding(taskq_t *, taskqid_t);
extern void taskq_wait(taskq_t *);
Expand Down
5 changes: 5 additions & 0 deletions include/sys/spa.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,11 @@ extern void spa_sync_allpools(void);

extern uint_t zfs_sync_pass_deferred_free;

/* spa sync taskqueues */
taskq_t *spa_sync_tq_create(spa_t *spa, const char *name);
void spa_sync_tq_destroy(spa_t *spa);
void spa_select_allocator(zio_t *zio);

/* spa namespace global mutex */
extern kmutex_t spa_namespace_lock;

Expand Down
13 changes: 12 additions & 1 deletion include/sys/spa_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ typedef struct spa_taskqs {
taskq_t **stqs_taskq;
} spa_taskqs_t;

/* one for each thread in the spa sync taskq */
typedef struct spa_syncthread_info {
kthread_t *sti_thread;
taskq_t *sti_wr_iss_tq; /* assigned wr_iss taskq */
} spa_syncthread_info_t;

typedef enum spa_all_vdev_zap_action {
AVZ_ACTION_NONE = 0,
AVZ_ACTION_DESTROY, /* Destroy all per-vdev ZAPs and the AVZ. */
Expand Down Expand Up @@ -265,6 +271,11 @@ struct spa {
int spa_alloc_count;
int spa_active_allocator; /* selectable allocator */

/* per-allocator sync thread taskqs */
taskq_t *spa_sync_tq;
spa_syncthread_info_t *spa_syncthreads;
kthread_t **spa_kthreads;

spa_aux_vdev_t spa_spares; /* hot spares */
spa_aux_vdev_t spa_l2cache; /* L2ARC cache devices */
nvlist_t *spa_label_features; /* Features for reading MOS */
Expand Down Expand Up @@ -456,7 +467,7 @@ extern char *spa_config_path;
extern const char *zfs_deadman_failmode;
extern uint_t spa_slop_shift;
extern void spa_taskq_dispatch_ent(spa_t *spa, zio_type_t t, zio_taskq_type_t q,
task_func_t *func, void *arg, uint_t flags, taskq_ent_t *ent);
task_func_t *func, void *arg, uint_t flags, taskq_ent_t *ent, zio_t *zio);
extern void spa_taskq_dispatch_sync(spa_t *, zio_type_t t, zio_taskq_type_t q,
task_func_t *func, void *arg, uint_t flags);
extern void spa_load_spares(spa_t *spa);
Expand Down
3 changes: 3 additions & 0 deletions include/sys/zfs_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ extern taskq_t *system_taskq;
extern taskq_t *system_delay_taskq;

extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t);
extern taskq_t *taskq_create_synced(const char *, int, pri_t, int, int, uint_t,
kthread_t ***);
#define taskq_create_proc(a, b, c, d, e, p, f) \
(taskq_create(a, b, c, d, e, f))
#define taskq_create_sysdc(a, b, d, e, p, dc, f) \
Expand All @@ -506,6 +508,7 @@ extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t,
extern int taskq_empty_ent(taskq_ent_t *);
extern void taskq_init_ent(taskq_ent_t *);
extern void taskq_destroy(taskq_t *);
extern void taskq_destroy_synced(taskq_t *, kthread_t **);
extern void taskq_wait(taskq_t *);
extern void taskq_wait_id(taskq_t *, taskqid_t);
extern void taskq_wait_outstanding(taskq_t *, taskqid_t);
Expand Down
6 changes: 6 additions & 0 deletions include/sys/zio.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ typedef uint64_t zio_flag_t;
#define ZIO_FLAG_REEXECUTED (1ULL << 29)
#define ZIO_FLAG_DELEGATED (1ULL << 30)

#define ZIO_ALLOCATOR_NONE (-1)
#define ZIO_HAS_ALLOCATOR(zio) ((zio)->io_allocator != ZIO_ALLOCATOR_NONE)

#define ZIO_FLAG_MUSTSUCCEED 0
#define ZIO_FLAG_RAW (ZIO_FLAG_RAW_COMPRESS | ZIO_FLAG_RAW_ENCRYPT)

Expand Down Expand Up @@ -526,6 +529,9 @@ struct zio {

/* Taskq dispatching state */
taskq_ent_t io_tqent;

/* write issue taskq selection, based upon sync thread */
taskq_t *io_wr_iss_tq;
};

enum blk_verify_flag {
Expand Down
99 changes: 99 additions & 0 deletions lib/libzpool/taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,105 @@ taskq_destroy(taskq_t *tq)
kmem_free(tq, sizeof (taskq_t));
}

static void taskq_sync_assign(void *arg);

typedef struct taskq_sync_arg {
kthread_t *tqa_thread;
kcondvar_t tqa_cv;
kmutex_t tqa_lock;
int tqa_ready;
} taskq_sync_arg_t;

static void
taskq_sync_assign(void *arg)
{
taskq_sync_arg_t *tqa = arg;

mutex_enter(&tqa->tqa_lock);
tqa->tqa_thread = curthread;
tqa->tqa_ready = 1;
cv_signal(&tqa->tqa_cv);
while (tqa->tqa_ready == 1)
cv_wait(&tqa->tqa_cv, &tqa->tqa_lock);
mutex_exit(&tqa->tqa_lock);
}

/*
* Create a taskq with a specified number of pool threads. Allocate
* and return an array of kthread_t pointers, one for each thread
* in the pool. Note, the array is not ordered.
*
* A taskq created with this function may only be destroyed by
* taskq_destroy_synced().
*/
taskq_t *
taskq_create_synced(const char *name, int nthreads, pri_t pri,
int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
{
taskq_t *tq;
taskq_sync_arg_t *tqs = kmem_zalloc(sizeof (*tqs) * nthreads, KM_SLEEP);
kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
KM_SLEEP);

(void) pri; (void) minalloc; (void) maxalloc;

flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);

tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
flags | TASKQ_PREPOPULATE);
VERIFY(tq != NULL);
VERIFY(tq->tq_nthreads == nthreads);

/* spawn all syncthreads */
for (int i = 0; i < nthreads; i++) {
cv_init(&tqs[i].tqa_cv, NULL, CV_DEFAULT, NULL);
mutex_init(&tqs[i].tqa_lock, NULL, MUTEX_DEFAULT, NULL);
(void) taskq_dispatch(tq, taskq_sync_assign,
&tqs[i], TQ_FRONT);
}

/* wait on all syncthreads to start */
for (int i = 0; i < nthreads; i++) {
mutex_enter(&tqs[i].tqa_lock);
while (tqs[i].tqa_ready == 0)
cv_wait(&tqs[i].tqa_cv, &tqs[i].tqa_lock);
mutex_exit(&tqs[i].tqa_lock);
}

/* let all syncthreads resume, finish */
for (int i = 0; i < nthreads; i++) {
mutex_enter(&tqs[i].tqa_lock);
tqs[i].tqa_ready = 2;
cv_broadcast(&tqs[i].tqa_cv);
mutex_exit(&tqs[i].tqa_lock);
}
taskq_wait(tq);

for (int i = 0; i < nthreads; i++) {
kthreads[i] = tqs[i].tqa_thread;
mutex_destroy(&tqs[i].tqa_lock);
cv_destroy(&tqs[i].tqa_cv);
}
kmem_free(tqs, sizeof (*tqs) * nthreads);

*ktpp = kthreads;
return (tq);
}

/*
* Destroy a taskq created by taskq_create_synced(). The kthread_t array
* must be the same one that was created by taskq_create_synced().
*/
void
taskq_destroy_synced(taskq_t *tq, kthread_t **ktp)
{
ASSERT(tq != NULL);

taskq_wait(tq);
kmem_free(ktp, sizeof (*ktp) * tq->tq_nthreads);
taskq_destroy(tq);
}

int
taskq_member(taskq_t *tq, kthread_t *t)
{
Expand Down
22 changes: 15 additions & 7 deletions man/man4/zfs.4
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,13 @@ If we have less than this amount of free space,
most ZPL operations (e.g. write, create) will return
.Sy ENOSPC .
.
.It Sy spa_num_allocators Ns = Ns Sy 4 Pq int
Determines the number of block alloctators to use per spa instance.
Capped by the number of actual CPUs in the system.
.Pp
Note that setting this value too high could result in performance
degredation and/or excess fragmentation.
.
.It Sy spa_upgrade_errlog_limit Ns = Ns Sy 0 Pq uint
Limits the number of on-disk error log entries that will be converted to the
new format when enabling the
Expand Down Expand Up @@ -1977,13 +1984,6 @@ and may need to load new metaslabs to satisfy these allocations.
.It Sy zfs_sync_pass_rewrite Ns = Ns Sy 2 Pq uint
Rewrite new block pointers starting in this pass.
.
.It Sy zfs_sync_taskq_batch_pct Ns = Ns Sy 75 Ns % Pq int
This controls the number of threads used by
.Sy dp_sync_taskq .
The default value of
.Sy 75%
will create a maximum of one thread per CPU.
.
.It Sy zfs_trim_extent_bytes_max Ns = Ns Sy 134217728 Ns B Po 128 MiB Pc Pq uint
Maximum size of TRIM command.
Larger ranges will be split into chunks no larger than this value before
Expand Down Expand Up @@ -2275,6 +2275,14 @@ If
.Sy 0 ,
generate a system-dependent value close to 6 threads per taskq.
.
.It Sy zio_taskq_wr_iss_ncpus Ns = Ns Sy 32 Pq uint
Determines the number of CPUs to run write issue taskqs.
.Pp
While an optimal value will be system dependent, a suggested value
is the number of actual CPUs in the system, divided by the
.Sy spa_num_allocators
value.
.
.It Sy zvol_inhibit_dev Ns = Ns Sy 0 Ns | Ns 1 Pq uint
Do not create zvol device nodes.
This may slightly improve startup time on
Expand Down
98 changes: 98 additions & 0 deletions module/os/freebsd/spl/spl_taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ taskq_create_impl(const char *name, int nthreads, pri_t pri,
nthreads = MAX((mp_ncpus * nthreads) / 100, 1);

tq = kmem_alloc(sizeof (*tq), KM_SLEEP);
tq->tq_nthreads = nthreads;
tq->tq_queue = taskqueue_create(name, M_WAITOK,
taskqueue_thread_enqueue, &tq->tq_queue);
taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_INIT,
Expand Down Expand Up @@ -257,6 +258,103 @@ taskq_destroy(taskq_t *tq)
kmem_free(tq, sizeof (*tq));
}

static void taskq_sync_assign(void *arg);

typedef struct taskq_sync_arg {
kthread_t *tqa_thread;
kcondvar_t tqa_cv;
kmutex_t tqa_lock;
int tqa_ready;
} taskq_sync_arg_t;

static void
taskq_sync_assign(void *arg)
{
taskq_sync_arg_t *tqa = arg;

mutex_enter(&tqa->tqa_lock);
tqa->tqa_thread = curthread;
tqa->tqa_ready = 1;
cv_signal(&tqa->tqa_cv);
while (tqa->tqa_ready == 1)
cv_wait(&tqa->tqa_cv, &tqa->tqa_lock);
mutex_exit(&tqa->tqa_lock);
}

/*
* Create a taskq with a specified number of pool threads. Allocate
* and return an array of kthread_t pointers, one for each thread
* in the pool. Note, the array is not ordered.
*
* A taskq created with this function may only be destroyed by
* taskq_destroy_synced().
*/
taskq_t *
taskq_create_synced(const char *name, int nthreads, pri_t pri,
int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
{
taskq_t *tq;
taskq_sync_arg_t *tqs = kmem_zalloc(sizeof (*tqs) * nthreads, KM_SLEEP);
kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
KM_SLEEP);

flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);

tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
flags | TASKQ_PREPOPULATE);
VERIFY(tq != NULL);
VERIFY(tq->tq_nthreads == nthreads);

/* spawn all syncthreads */
for (int i = 0; i < nthreads; i++) {
cv_init(&tqs[i].tqa_cv, NULL, CV_DEFAULT, NULL);
mutex_init(&tqs[i].tqa_lock, NULL, MUTEX_DEFAULT, NULL);
(void) taskq_dispatch(tq, taskq_sync_assign,
&tqs[i], TQ_FRONT);
}

/* wait on all syncthreads to start */
for (int i = 0; i < nthreads; i++) {
mutex_enter(&tqs[i].tqa_lock);
while (tqs[i].tqa_ready == 0)
cv_wait(&tqs[i].tqa_cv, &tqs[i].tqa_lock);
mutex_exit(&tqs[i].tqa_lock);
}

/* let all syncthreads resume, finish */
for (int i = 0; i < nthreads; i++) {
mutex_enter(&tqs[i].tqa_lock);
tqs[i].tqa_ready = 2;
cv_broadcast(&tqs[i].tqa_cv);
mutex_exit(&tqs[i].tqa_lock);
}
taskq_wait(tq);

for (int i = 0; i < nthreads; i++) {
kthreads[i] = tqs[i].tqa_thread;
mutex_destroy(&tqs[i].tqa_lock);
cv_destroy(&tqs[i].tqa_cv);
}
kmem_free(tqs, sizeof (*tqs) * nthreads);

*ktpp = kthreads;
return (tq);
}

/*
* Destroy a taskq created by taskq_create_synced(). The kthread_t array
* must be the same one that was created by taskq_create_synced().
*/
void
taskq_destroy_synced(taskq_t *tq, kthread_t **ktp)
{
ASSERT(tq != NULL);

taskq_wait(tq);
kmem_free(ktp, sizeof (*ktp) * tq->tq_nthreads);
taskq_destroy(tq);
}

int
taskq_member(taskq_t *tq, kthread_t *thread)
{
Expand Down
Loading

0 comments on commit 171f81e

Please sign in to comment.