Skip to content

Commit

Permalink
async,main: add work identifier and async cancel function
Browse files Browse the repository at this point in the history
  • Loading branch information
sreimers committed Jan 12, 2023
1 parent 755ea46 commit 6246eb8
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 9 deletions.
9 changes: 5 additions & 4 deletions include/re_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
#define RE_H_ASYNC__
struct re_async;

typedef int (re_async_work_h)(void *arg);
typedef void (re_async_h)(int err, void *arg);
typedef int(re_async_work_h)(void *arg);
typedef void(re_async_h)(int err, void *arg);

int re_async_alloc(struct re_async **asyncp, uint16_t workers);
int re_async(struct re_async *a, re_async_work_h *workh, re_async_h *cb,
void *arg);
int re_async(struct re_async *a, intptr_t id, re_async_work_h *workh,
re_async_h *cb, void *arg);
void re_async_cancel(struct re_async *async, intptr_t id);

#endif
3 changes: 3 additions & 0 deletions include/re_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ int re_thread_check(void);
int re_thread_async_init(uint16_t workers);
void re_thread_async_close(void);
int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg);
int re_thread_async_id(intptr_t id, re_async_work_h *work, re_async_h *cb,
void *arg);
void re_thread_async_cancel(intptr_t id);

void re_set_mutex(void *mutexp);

Expand Down
51 changes: 49 additions & 2 deletions src/async/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ struct async_work {
re_async_h *cb;
void *arg;
int err;
intptr_t id;
};

struct re_async {
Expand Down Expand Up @@ -210,14 +211,15 @@ int re_async_alloc(struct re_async **asyncp, uint16_t workers)
* Execute work handler async and get a callback from re main thread
*
* @param async Pointer to async object
* @param id Work identifier
* @param workh Work handler
* @param cb Callback handler (called by re main thread)
* @param arg Handler argument (has to be thread-safe)
*
* @return 0 if success, otherwise errorcode
*/
int re_async(struct re_async *async, re_async_work_h *workh, re_async_h *cb,
void *arg)
int re_async(struct re_async *async, intptr_t id, re_async_work_h *workh,
re_async_h *cb, void *arg)
{
int err = 0;
struct async_work *async_work;
Expand All @@ -241,6 +243,7 @@ int re_async(struct re_async *async, re_async_work_h *workh, re_async_h *cb,
async_work->workh = workh;
async_work->cb = cb;
async_work->arg = arg;
async_work->id = id;

list_append(&async->workl, &async_work->le, async_work);
cnd_signal(&async->wait);
Expand All @@ -250,3 +253,47 @@ int re_async(struct re_async *async, re_async_work_h *workh, re_async_h *cb,

return err;
}


/**
* Cancel pending async work and callback
*
* @param async Pointer to async object
* @param id Work identifier
*/
void re_async_cancel(struct re_async *async, intptr_t id)
{
struct le *le;

if (unlikely(!async))
return;

mtx_lock(&async->mtx);

LIST_FOREACH(&async->workl, le)
{
struct async_work *w = le->data;

if (w->id != id)
continue;

w->workh = NULL;
w->cb = NULL;
w->arg = mem_deref(w->arg);
list_move(&w->le, &async->freel);
}

LIST_FOREACH(&async->curl, le)
{
struct async_work *w = le->data;

if (w->id != id)
continue;

w->workh = NULL;
w->cb = NULL;
w->arg = mem_deref(w->arg);
}

mtx_unlock(&async->mtx);
}
60 changes: 57 additions & 3 deletions src/main/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -1519,9 +1519,9 @@ void re_thread_async_close(void)
*
* @param work Work handler
* @param cb Callback handler (called by re main thread)
* @param arg Handler argument (has to be thread-safe)
* @param arg Handler argument (has to be thread-safe and mem_deref-safe)
*
* @return async object on success, otherwise NULL
* @return 0 if success, otherwise errorcode
*/
int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg)
{
Expand All @@ -1540,5 +1540,59 @@ int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg)
return err;
}

return re_async(re->async, work, cb, arg);
return re_async(re->async, 0, work, cb, arg);
}


/**
* Execute work handler for current event loop
*
* @param id Work identifier
* @param work Work handler
* @param cb Callback handler (called by re main thread)
* @param arg Handler argument (has to be thread-safe and mem_deref-safe)
*
* @return 0 if success, otherwise errorcode
*/
int re_thread_async_id(intptr_t id, re_async_work_h *work, re_async_h *cb,
void *arg)
{
struct re *re = re_get();
int err;

if (unlikely(!re)) {
DEBUG_WARNING("re_thread_async_id: re not ready\n");
return EAGAIN;
}

if (unlikely(!re->async)) {
/* fallback needed for internal libre functions */
err = re_async_alloc(&re->async, RE_THREAD_WORKERS);
if (err)
return err;
}

return re_async(re->async, id, work, cb, arg);
}


/**
* Cancel pending async work and callback
*
* @param id Work identifier
*/
void re_thread_async_cancel(intptr_t id)
{
struct re *re = re_get();

if (unlikely(!re)) {
DEBUG_WARNING("re_thread_async_cancel: re not ready\n");
return;
}

#ifndef RELEASE
re_thread_check();
#endif

re_async_cancel(re->async, id);
}

0 comments on commit 6246eb8

Please sign in to comment.