Skip to content

Commit

Permalink
aio: introduce nni_aio_reset to reset the aio before submitting more …
Browse files Browse the repository at this point in the history
…work

This allows some use cases to reset things like the counts and outputs, before
submitting more jobs. Providers should call this near the top of their
functions; this is done without any locks so it should be very fast.
  • Loading branch information
gdamore committed Dec 26, 2024
1 parent 9cece0b commit b5826da
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
53 changes: 41 additions & 12 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ nni_aio_begin(nni_aio *aio)
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
aio->a_stop = true;
aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);

return (NNG_ESTOPPED);
Expand All @@ -365,6 +366,20 @@ nni_aio_begin(nni_aio *aio)
return (0);
}

void
nni_aio_reset(nni_aio *aio)
{
aio->a_result = 0;
aio->a_count = 0;
aio->a_abort = false;
aio->a_expire_ok = false;
aio->a_sleep = false;

for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
}

int
nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
Expand All @@ -387,7 +402,8 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)

nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop || eq->eq_stop) {
aio->a_stop = true;
aio->a_stop = true;
aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ESTOPPED);
}
Expand Down Expand Up @@ -437,9 +453,10 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)

nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop || eq->eq_stop) {
aio->a_stop = true;
aio->a_sleep = false;
aio->a_result = NNG_ESTOPPED;
aio->a_stop = true;
aio->a_sleep = false;
aio->a_result = NNG_ESTOPPED;
aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
Expand Down Expand Up @@ -496,6 +513,10 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
} else if (aio->a_use_expire && aio->a_expire <= nni_clock()) {
timeout = true;
}
if (!aio->a_sleep) {
aio->a_expire_ok = false;
}
aio->a_result = 0;

// Do this outside the lock. Note that we don't strictly need to have
// done this for the failure cases below (the task framework does the
Expand All @@ -504,26 +525,33 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_task_prep(&aio->a_task);

nni_mtx_lock(&eq->eq_mtx);
NNI_ASSERT(!aio->a_stopped);
if (aio->a_stop || eq->eq_stop) {
aio->a_stop = true;
aio->a_sleep = false;
aio->a_result = NNG_ESTOPPED;
aio->a_stop = true;
aio->a_sleep = false;
aio->a_expire_ok = false;
aio->a_count = 0;
aio->a_result = NNG_ESTOPPED;
aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (aio->a_abort) {
aio->a_sleep = false;
aio->a_abort = false;
aio->a_sleep = false;
aio->a_abort = false;
aio->a_expire_ok = false;
aio->a_count = 0;
NNI_ASSERT(aio->a_result != 0);
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (timeout) {
aio->a_sleep = false;
aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
aio->a_abort = false;
aio->a_sleep = false;
aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
aio->a_expire_ok = false;
aio->a_count = 0;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
Expand Down Expand Up @@ -901,6 +929,7 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
nni_aio_reset(aio);
aio->a_expire_ok = true;
aio->a_sleep = true;
switch (aio->a_timeout) {
Expand Down
6 changes: 6 additions & 0 deletions src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);
// or was canceled before this call (but after nni_aio_begin).
extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *);

// nni_aio_reset is called by providers before doing any work -- it resets
// counts other fields to their initial state. It will not reset the closed
// state if the aio has been stopped or closed.
extern void nni_aio_reset(nni_aio *);

// nni_aio_start should be called before any asynchronous operation
// is filed. It need not be called for completions that are synchronous
// at job submission.
Expand Down Expand Up @@ -230,6 +235,7 @@ struct nng_aio {
bool a_use_expire; // Use expire instead of timeout
bool a_abort; // Task was aborted
bool a_init; // Initialized this
bool a_stopped; // Debug - set when we finish stopped
nni_task a_task;

// Read/write operations.
Expand Down
2 changes: 2 additions & 0 deletions src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,14 @@ nng_stream_free(nng_stream *s)
void
nng_stream_send(nng_stream *s, nng_aio *aio)
{
nni_aio_reset(aio);
s->s_send(s, aio);
}

void
nng_stream_recv(nng_stream *s, nng_aio *aio)
{
nni_aio_reset(aio);
s->s_recv(s, aio);
}

Expand Down

0 comments on commit b5826da

Please sign in to comment.