Skip to content

Commit

Permalink
aio: remove nni_aio_begin and nni_aio_schedule
Browse files Browse the repository at this point in the history
The nni_aio_start function replaces these two functions with a
simple, single call, reducing pressure on common locks.
  • Loading branch information
gdamore committed Dec 27, 2024
1 parent 3f3eb35 commit c64d0e5
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 123 deletions.
101 changes: 5 additions & 96 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ static int nni_aio_expire_q_cnt;
// equal to the aio) before destroying it.
//
// The aio framework is tightly bound up with the task framework. We
// "prepare" the task for an aio when a caller marks an aio as starting
// (with nni_aio_begin), and that marks the task as busy. Then, all we have
// "start" the task for an aio when a caller marks an aio as starting
// (with nni_aio_start), and that marks the task as busy. Then, all we have
// to do is wait for the task to complete (the busy flag to be cleared)
// when we want to know if the operation itself is complete.
//
// In order to guard against aio reuse during teardown, we set the a_stop
// flag. Any attempt to submit new operation after that point will fail with
// the status NNG_ESTOPPED indicating this. The provider that calls
// nni_aio_begin() MUST check the return value, and if it comes back
// NNG_ESTOPPED then it must simply discard the request and // return.
// nni_aio_start() MUST check the return value, and if it comes back
// false then it must simply discard the request and return.
//
// Calling nni_aio_wait waits for the current outstanding operation to
// complete, but does not block another one from being started on the
Expand Down Expand Up @@ -175,7 +175,7 @@ nni_aio_set_iov(nni_aio *aio, unsigned nio, const nni_iov *iov)

// nni_aio_stop cancels any outstanding operation, and waits for the
// callback to complete, if still running. It also marks the AIO as
// stopped, preventing further calls to nni_aio_begin from succeeding.
// stopped, preventing further calls to nni_aio_start from succeeding.
// To correctly tear down an AIO, call stop, and make sure any other
// callers are not also stopped, before calling nni_aio_free to release
// actual memory.
Expand Down Expand Up @@ -322,50 +322,6 @@ nni_aio_busy(nni_aio *aio)
return (nni_task_busy(&aio->a_task));
}

int
nni_aio_begin(nni_aio *aio)
{
// If any of these triggers then the caller has a defect because
// it means that the aio is already in use. This is always
// a bug in the caller. These checks are not technically thread
// safe in the event that they are false. Users of race detectors
// checks may wish ignore or suppress these checks.
nni_aio_expire_q *eq = aio->a_expire_q;

NNI_ASSERT(aio->a_init);
nni_mtx_lock(&eq->eq_mtx);
NNI_ASSERT(!nni_aio_list_active(aio));
NNI_ASSERT(aio->a_cancel_fn == NULL);
NNI_ASSERT(!nni_list_node_active(&aio->a_expire_node));

// Some initialization can be done outside the lock, because
// we must have exclusive access to the aio.
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
aio->a_result = 0;
aio->a_count = 0;
aio->a_cancel_fn = NULL;
aio->a_abort = false;
aio->a_expire_ok = false;
aio->a_sleep = false;

// We should not reschedule anything at this point.
if (aio->a_stop || eq->eq_stop) {
aio->a_result = NNG_ESTOPPED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
aio->a_stop = true;
aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);

return (NNG_ESTOPPED);
}
nni_task_prep(&aio->a_task);
nni_mtx_unlock(&eq->eq_mtx);
return (0);
}

void
nni_aio_reset(nni_aio *aio)
{
Expand All @@ -380,53 +336,6 @@ nni_aio_reset(nni_aio *aio)
}
}

int
nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
nni_aio_expire_q *eq = aio->a_expire_q;

if ((!aio->a_sleep) && (!aio->a_use_expire)) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
return (NNG_ETIMEDOUT);
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
break;
default:
aio->a_expire = nni_clock() + aio->a_timeout;
break;
}
}

nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop || eq->eq_stop) {
aio->a_stop = true;
aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ESTOPPED);
}
if (aio->a_abort) {
int rv = aio->a_result;
nni_mtx_unlock(&eq->eq_mtx);
NNI_ASSERT(rv != 0);
return (rv);
}

NNI_ASSERT(aio->a_cancel_fn == NULL);
aio->a_cancel_fn = cancel;
aio->a_cancel_arg = data;

// We only schedule expiration if we have a way for the expiration
// handler to actively cancel it.
if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&eq->eq_mtx);
return (0);
}

bool
nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
Expand Down
36 changes: 9 additions & 27 deletions src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ extern void nni_aio_free(nni_aio *aio);
extern void nni_aio_stop(nni_aio *);

// nni_aio_close closes the aio for further activity. It aborts any in-progress
// transaction (if it can), and future calls nni_aio_begin or nni_aio_schedule
// with both result in NNG_ECLOSED. The expectation is that protocols call this
// for all their aio objects in a stop routine, before calling fini on any of
// them.
// transaction (if it can), and future calls nni_aio_start will result in
// NNG_ECLOSED. The expectation is that protocols call this for all their aio
// objects in a stop routine, before calling fini on any of them.
extern void nni_aio_close(nni_aio *);

// nni_set_input sets input parameters on the AIO. The semantic details
Expand Down Expand Up @@ -129,14 +128,6 @@ extern void nni_aio_finish_msg(nni_aio *, nni_msg *);
// with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.)
extern void nni_aio_abort(nni_aio *, int rv);

// nni_aio_begin is called by a provider to indicate it is starting the
// operation, and to check that the aio has not already been marked for
// teardown. It returns 0 on success, or NNG_ECANCELED if the aio is being
// torn down. (In that case, no operation should be aborted without any
// call to any other functions on this AIO, most especially not the
// nng_aio_finish family of functions.)
extern int nni_aio_begin(nni_aio *);

extern void *nni_aio_get_prov_data(nni_aio *);
extern void nni_aio_set_prov_data(nni_aio *, void *);
// nni_aio_advance_iov moves up the iov, reflecting that some I/O as
Expand All @@ -156,28 +147,19 @@ extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **);
extern void nni_aio_normalize_timeout(nni_aio *, nng_duration);
extern void nni_aio_bump_count(nni_aio *, size_t);

// nni_aio_schedule indicates that the AIO has begun, and is scheduled for
// asynchronous completion. This also starts the expiration timer. Note that
// prior to this, the aio cannot be canceled. If the operation has a zero
// timeout (NNG_FLAG_NONBLOCK) then NNG_ETIMEDOUT is returned. If the
// operation has already been canceled, or should not be run, then an error
// is returned. (In that case the caller should probably either return an
// error to its caller, or possibly cause an asynchronous error by calling
// nni_aio_finish_error on this aio.)
//
// NB: This function should be called while holding the lock that will be used
// to cancel the operation. Otherwise a race can occur where the operation
// cannot be canceled, which can lead to apparent hangs.
extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);

// nni_aio_reset is called by providers before doing any work -- it resets
// counts other fields to their initial state. It will not reset the closed
// state if the aio has been stopped or closed.
extern void nni_aio_reset(nni_aio *);

// nni_aio_start should be called before any asynchronous operation
// is filed. It need not be called for completions that are synchronous
// at job submission.
// at job submission. It also starts the expiration timer, is the aio
// has an expiration. If this returns false, the caller should cease
// operating with the aio, and simply return. The callback for the aio
// will be executed by this function, if one is set, with an error such
// as NNG_ESTOPPED or NNG_ETIMEDTOUT. Until this function is called,
// the operation is not cancelable.
extern bool nni_aio_start(nni_aio *, nni_aio_cancel_fn, void *);

extern void nni_sleep_aio(nni_duration, nni_aio *);
Expand Down

0 comments on commit c64d0e5

Please sign in to comment.