Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenZFS 9425 - channel programs can be interrupted #8904

Merged
merged 2 commits into from
Jun 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/spl/sys/condvar.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ extern void __cv_init(kcondvar_t *, char *, kcv_type_t, void *);
extern void __cv_destroy(kcondvar_t *);
extern void __cv_wait(kcondvar_t *, kmutex_t *);
extern void __cv_wait_io(kcondvar_t *, kmutex_t *);
extern void __cv_wait_sig(kcondvar_t *, kmutex_t *);
extern int __cv_wait_io_sig(kcondvar_t *, kmutex_t *);
extern int __cv_wait_sig(kcondvar_t *, kmutex_t *);
extern clock_t __cv_timedwait(kcondvar_t *, kmutex_t *, clock_t);
extern clock_t __cv_timedwait_io(kcondvar_t *, kmutex_t *, clock_t);
extern clock_t __cv_timedwait_sig(kcondvar_t *, kmutex_t *, clock_t);
Expand All @@ -69,6 +70,7 @@ extern void __cv_broadcast(kcondvar_t *c);
#define cv_destroy(cvp) __cv_destroy(cvp)
#define cv_wait(cvp, mp) __cv_wait(cvp, mp)
#define cv_wait_io(cvp, mp) __cv_wait_io(cvp, mp)
#define cv_wait_io_sig(cvp, mp) __cv_wait_io_sig(cvp, mp)
#define cv_wait_sig(cvp, mp) __cv_wait_sig(cvp, mp)
#define cv_wait_interruptible(cvp, mp) cv_wait_sig(cvp, mp)
#define cv_timedwait(cvp, mp, t) __cv_timedwait(cvp, mp, t)
Expand Down
3 changes: 3 additions & 0 deletions include/sys/dsl_synctask.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct dsl_pool;

typedef int (dsl_checkfunc_t)(void *, dmu_tx_t *);
typedef void (dsl_syncfunc_t)(void *, dmu_tx_t *);
typedef void (dsl_sigfunc_t)(void *, dmu_tx_t *);

typedef enum zfs_space_check {
/*
Expand Down Expand Up @@ -116,6 +117,8 @@ int dsl_early_sync_task(const char *, dsl_checkfunc_t *,
dsl_syncfunc_t *, void *, int, zfs_space_check_t);
void dsl_early_sync_task_nowait(struct dsl_pool *, dsl_syncfunc_t *,
void *, int, zfs_space_check_t, dmu_tx_t *);
int dsl_sync_task_sig(const char *, dsl_checkfunc_t *, dsl_syncfunc_t *,
dsl_sigfunc_t *, void *, int, zfs_space_check_t);

#ifdef __cplusplus
}
Expand Down
5 changes: 5 additions & 0 deletions include/sys/txg.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ extern void txg_kick(struct dsl_pool *dp);
*/
extern void txg_wait_synced(struct dsl_pool *dp, uint64_t txg);

/*
* Wait as above. Returns true if the thread was signaled while waiting.
*/
extern boolean_t txg_wait_synced_sig(struct dsl_pool *dp, uint64_t txg);

/*
* Wait until the given transaction group, or one after it, is
* the open transaction group. Try to make this happen as soon
Expand Down
31 changes: 31 additions & 0 deletions include/sys/zcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ typedef struct zcp_cleanup_handler {
list_node_t zch_node;
} zcp_cleanup_handler_t;

typedef struct zcp_alloc_arg {
boolean_t aa_must_succeed;
int64_t aa_alloc_remaining;
int64_t aa_alloc_limit;
} zcp_alloc_arg_t;

typedef struct zcp_run_info {
dsl_pool_t *zri_pool;

Expand Down Expand Up @@ -93,6 +99,11 @@ typedef struct zcp_run_info {
*/
boolean_t zri_timed_out;

/*
* Channel program was canceled by user
*/
boolean_t zri_canceled;

/*
* Boolean indicating whether or not we are running in syncing
* context.
Expand All @@ -104,6 +115,26 @@ typedef struct zcp_run_info {
* triggered in the event of a fatal error.
*/
list_t zri_cleanup_handlers;

/*
* The Lua state context of our channel program.
*/
lua_State *zri_state;

/*
* Lua memory allocator arguments.
*/
zcp_alloc_arg_t *zri_allocargs;

/*
* Contains output values from zcp script or error string.
*/
nvlist_t *zri_outnvl;

/*
* The errno number returned to caller of zcp_eval().
*/
int zri_result;
} zcp_run_info_t;

zcp_run_info_t *zcp_run_info(lua_State *);
Expand Down
3 changes: 2 additions & 1 deletion include/sys/zfs_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ typedef pthread_cond_t kcondvar_t;
extern void cv_init(kcondvar_t *cv, char *name, int type, void *arg);
extern void cv_destroy(kcondvar_t *cv);
extern void cv_wait(kcondvar_t *cv, kmutex_t *mp);
extern int cv_wait_sig(kcondvar_t *cv, kmutex_t *mp);
extern clock_t cv_timedwait(kcondvar_t *cv, kmutex_t *mp, clock_t abstime);
extern clock_t cv_timedwait_hires(kcondvar_t *cvp, kmutex_t *mp, hrtime_t tim,
hrtime_t res, int flag);
Expand All @@ -313,8 +314,8 @@ extern void cv_broadcast(kcondvar_t *cv);

#define cv_timedwait_io(cv, mp, at) cv_timedwait(cv, mp, at)
#define cv_timedwait_sig(cv, mp, at) cv_timedwait(cv, mp, at)
#define cv_wait_sig(cv, mp) cv_wait(cv, mp)
#define cv_wait_io(cv, mp) cv_wait(cv, mp)
#define cv_wait_io_sig(cv, mp) cv_wait_sig(cv, mp)
#define cv_timedwait_sig_hires(cv, mp, t, r, f) \
cv_timedwait_hires(cv, mp, t, r, f)

Expand Down
7 changes: 7 additions & 0 deletions lib/libzpool/kernel.c
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ cv_wait(kcondvar_t *cv, kmutex_t *mp)
mp->m_owner = pthread_self();
}

int
cv_wait_sig(kcondvar_t *cv, kmutex_t *mp)
{
cv_wait(cv, mp);
return (1);
}

clock_t
cv_timedwait(kcondvar_t *cv, kmutex_t *mp, clock_t abstime)
{
Expand Down
19 changes: 18 additions & 1 deletion module/spl/spl-condvar.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
#include <linux/hrtimer.h>
#include <linux/compiler_compat.h>

#include <linux/sched.h>

#ifdef HAVE_SCHED_SIGNAL_HEADER
#include <linux/sched/signal.h>
#endif

void
__cv_init(kcondvar_t *cvp, char *name, kcv_type_t type, void *arg)
{
Expand Down Expand Up @@ -144,10 +150,21 @@ __cv_wait_io(kcondvar_t *cvp, kmutex_t *mp)
}
EXPORT_SYMBOL(__cv_wait_io);

void
int
__cv_wait_io_sig(kcondvar_t *cvp, kmutex_t *mp)
{
cv_wait_common(cvp, mp, TASK_INTERRUPTIBLE, 1);

return (signal_pending(current) ? 0 : 1);
}
EXPORT_SYMBOL(__cv_wait_io_sig);

int
__cv_wait_sig(kcondvar_t *cvp, kmutex_t *mp)
{
cv_wait_common(cvp, mp, TASK_INTERRUPTIBLE, 0);

return (signal_pending(current) ? 0 : 1);
}
EXPORT_SYMBOL(__cv_wait_sig);

Expand Down
24 changes: 21 additions & 3 deletions module/zfs/dsl_synctask.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ dsl_null_checkfunc(void *arg, dmu_tx_t *tx)

static int
dsl_sync_task_common(const char *pool, dsl_checkfunc_t *checkfunc,
dsl_syncfunc_t *syncfunc, void *arg,
dsl_syncfunc_t *syncfunc, dsl_sigfunc_t *sigfunc, void *arg,
int blocks_modified, zfs_space_check_t space_check, boolean_t early)
{
spa_t *spa;
Expand Down Expand Up @@ -85,6 +85,11 @@ dsl_sync_task_common(const char *pool, dsl_checkfunc_t *checkfunc,

dmu_tx_commit(tx);

if (sigfunc != NULL && txg_wait_synced_sig(dp, dst.dst_txg)) {
/* current contract is to call func once */
sigfunc(arg, tx);
sigfunc = NULL; /* in case we're performing an EAGAIN retry */
}
txg_wait_synced(dp, dst.dst_txg);

if (dst.dst_error == EAGAIN) {
Expand Down Expand Up @@ -124,7 +129,7 @@ dsl_sync_task(const char *pool, dsl_checkfunc_t *checkfunc,
dsl_syncfunc_t *syncfunc, void *arg,
int blocks_modified, zfs_space_check_t space_check)
{
return (dsl_sync_task_common(pool, checkfunc, syncfunc, arg,
return (dsl_sync_task_common(pool, checkfunc, syncfunc, NULL, arg,
blocks_modified, space_check, B_FALSE));
}

Expand All @@ -146,10 +151,23 @@ dsl_early_sync_task(const char *pool, dsl_checkfunc_t *checkfunc,
dsl_syncfunc_t *syncfunc, void *arg,
int blocks_modified, zfs_space_check_t space_check)
{
return (dsl_sync_task_common(pool, checkfunc, syncfunc, arg,
return (dsl_sync_task_common(pool, checkfunc, syncfunc, NULL, arg,
blocks_modified, space_check, B_TRUE));
}

/*
* A standard synctask that can be interrupted from a signal. The sigfunc
* is called once if a signal occurred while waiting for the task to sync.
*/
int
dsl_sync_task_sig(const char *pool, dsl_checkfunc_t *checkfunc,
dsl_syncfunc_t *syncfunc, dsl_sigfunc_t *sigfunc, void *arg,
int blocks_modified, zfs_space_check_t space_check)
{
return (dsl_sync_task_common(pool, checkfunc, syncfunc, sigfunc, arg,
blocks_modified, space_check, B_FALSE));
}

static void
dsl_sync_task_nowait_common(dsl_pool_t *dp, dsl_syncfunc_t *syncfunc, void *arg,
int blocks_modified, zfs_space_check_t space_check, dmu_tx_t *tx,
Expand Down
36 changes: 33 additions & 3 deletions module/zfs/txg.c
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,8 @@ txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
mutex_exit(&tx->tx_sync_lock);
}

void
txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
static boolean_t
txg_wait_synced_impl(dsl_pool_t *dp, uint64_t txg, boolean_t wait_sig)
{
tx_state_t *tx = &dp->dp_tx;

Expand All @@ -695,9 +695,39 @@ txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
"tx_synced=%llu waiting=%llu dp=%p\n",
tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
cv_broadcast(&tx->tx_sync_more_cv);
cv_wait_io(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
if (wait_sig) {
/*
* Condition wait here but stop if the thread receives a
* signal. The caller may call txg_wait_synced*() again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wording was a little confusing to me - stop is the opposite of continue, but we continue if we're no longer waiting.
Maybe something like: "Condition wait here but continue execution if the thread receives a signal."

* to resume waiting for this txg.
*/
if (cv_wait_io_sig(&tx->tx_sync_done_cv,
&tx->tx_sync_lock) == 0) {
mutex_exit(&tx->tx_sync_lock);
return (B_TRUE);
}
} else {
cv_wait_io(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
}
}
mutex_exit(&tx->tx_sync_lock);
return (B_FALSE);
}

void
txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
{
VERIFY0(txg_wait_synced_impl(dp, txg, B_FALSE));
}

/*
* Similar to a txg_wait_synced but it can be interrupted from a signal.
* Returns B_TRUE if the thread was signaled while waiting.
*/
boolean_t
txg_wait_synced_sig(dsl_pool_t *dp, uint64_t txg)
{
return (txg_wait_synced_impl(dp, txg, B_TRUE));
}

/*
Expand Down
Loading