diff --git a/include/re_async.h b/include/re_async.h index bdd232018..921f5658e 100644 --- a/include/re_async.h +++ b/include/re_async.h @@ -8,11 +8,12 @@ #define RE_H_ASYNC__ struct re_async; -typedef int (re_async_work_h)(void *arg); -typedef void (re_async_h)(int err, void *arg); +typedef int(re_async_work_h)(void *arg); +typedef void(re_async_h)(int err, void *arg); int re_async_alloc(struct re_async **asyncp, uint16_t workers); -int re_async(struct re_async *a, re_async_work_h *workh, re_async_h *cb, - void *arg); +int re_async(struct re_async *a, intptr_t id, re_async_work_h *workh, + re_async_h *cb, void *arg); +void re_async_cancel(struct re_async *async, intptr_t id); #endif diff --git a/include/re_main.h b/include/re_main.h index 72aaf1285..bc835a722 100644 --- a/include/re_main.h +++ b/include/re_main.h @@ -60,6 +60,9 @@ int re_thread_check(void); int re_thread_async_init(uint16_t workers); void re_thread_async_close(void); int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg); +int re_thread_async_id(intptr_t id, re_async_work_h *work, re_async_h *cb, + void *arg); +void re_thread_async_cancel(intptr_t id); void re_set_mutex(void *mutexp); diff --git a/src/async/async.c b/src/async/async.c index 5062766f7..c0d1a3df3 100644 --- a/src/async/async.c +++ b/src/async/async.c @@ -22,6 +22,7 @@ struct async_work { re_async_h *cb; void *arg; int err; + intptr_t id; }; struct re_async { @@ -210,14 +211,15 @@ int re_async_alloc(struct re_async **asyncp, uint16_t workers) * Execute work handler async and get a callback from re main thread * * @param async Pointer to async object + * @param id Work identifier * @param workh Work handler * @param cb Callback handler (called by re main thread) * @param arg Handler argument (has to be thread-safe) * * @return 0 if success, otherwise errorcode */ -int re_async(struct re_async *async, re_async_work_h *workh, re_async_h *cb, - void *arg) +int re_async(struct re_async *async, intptr_t id, re_async_work_h *workh, + re_async_h *cb, void *arg) { int err = 0; struct async_work *async_work; @@ -241,6 +243,7 @@ int re_async(struct re_async *async, re_async_work_h *workh, re_async_h *cb, async_work->workh = workh; async_work->cb = cb; async_work->arg = arg; + async_work->id = id; list_append(&async->workl, &async_work->le, async_work); cnd_signal(&async->wait); @@ -250,3 +253,47 @@ int re_async(struct re_async *async, re_async_work_h *workh, re_async_h *cb, return err; } + + +/** + * Cancel pending async work and callback + * + * @param async Pointer to async object + * @param id Work identifier + */ +void re_async_cancel(struct re_async *async, intptr_t id) +{ + struct le *le; + + if (unlikely(!async)) + return; + + mtx_lock(&async->mtx); + + LIST_FOREACH(&async->workl, le) + { + struct async_work *w = le->data; + + if (w->id != id) + continue; + + w->workh = NULL; + w->cb = NULL; + w->arg = mem_deref(w->arg); + list_move(&w->le, &async->freel); + } + + LIST_FOREACH(&async->curl, le) + { + struct async_work *w = le->data; + + if (w->id != id) + continue; + + w->workh = NULL; + w->cb = NULL; + w->arg = mem_deref(w->arg); + } + + mtx_unlock(&async->mtx); +} diff --git a/src/main/main.c b/src/main/main.c index 5747df4c4..d04c1ddbb 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -1519,9 +1519,9 @@ void re_thread_async_close(void) * * @param work Work handler * @param cb Callback handler (called by re main thread) - * @param arg Handler argument (has to be thread-safe) + * @param arg Handler argument (has to be thread-safe and mem_deref-safe) * - * @return async object on success, otherwise NULL + * @return 0 if success, otherwise errorcode */ int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg) { @@ -1540,5 +1540,58 @@ int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg) return err; } - return re_async(re->async, work, cb, arg); + return re_async(re->async, 0, work, cb, arg); +} + + +/** + * Execute work handler for current event loop + * + * @param id Work identifier + * @param work Work handler + * @param cb Callback handler (called by re main thread) + * @param arg Handler argument (has to be thread-safe and mem_deref-safe) + * + * @return 0 if success, otherwise errorcode + */ +int re_thread_async_id(intptr_t id, re_async_work_h *work, re_async_h *cb, void *arg) +{ + struct re *re = re_get(); + int err; + + if (unlikely(!re)) { + DEBUG_WARNING("re_thread_async_id: re not ready\n"); + return EAGAIN; + } + + if (unlikely(!re->async)) { + /* fallback needed for internal libre functions */ + err = re_async_alloc(&re->async, RE_THREAD_WORKERS); + if (err) + return err; + } + + return re_async(re->async, id, work, cb, arg); +} + + +/** + * Cancel pending async work and callback + * + * @param id Work identifier + */ +void re_thread_async_cancel(intptr_t id) +{ + struct re *re = re_get(); + + if (unlikely(!re)) { + DEBUG_WARNING("re_thread_async_cancel: re not ready\n"); + return; + } + +#ifndef RELEASE + re_thread_check(); +#endif + + re_async_cancel(re->async, id); }