From 072b2fadf0291a399d31f4df94ec579c920abf42 Mon Sep 17 00:00:00 2001 From: Luke Wagner Date: Thu, 5 Sep 2024 20:07:29 -0500 Subject: [PATCH] CABI: simplify and improve sync/async logic --- design/mvp/CanonicalABI.md | 394 ++++++++++-------------- design/mvp/canonical-abi/definitions.py | 149 +++++---- design/mvp/canonical-abi/run_tests.py | 20 +- 3 files changed, 242 insertions(+), 321 deletions(-) diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index 5630a5f8..1f10d9ac 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -260,57 +260,33 @@ reason that `async` is a keyword and most branches below want to start with the The `inst` field of `CallContext` points to the component instance which the `canon`-generated function is closed over. Component instances contain all the core wasm instance as well as some extra state that is used exclusively by the -Canonical ABI: +Canonical ABI and introduced below as the fields are used. ```python class ComponentInstance: - # core module instance state - may_leave: bool handles: HandleTables - num_tasks: int - backpressure: bool - calling_sync_import: bool - pending_tasks: list[asyncio.Future] - active_sync_task: bool - pending_sync_tasks: list[asyncio.Future] async_subtasks: Table[AsyncSubtask] + may_leave: bool + interruptible: asyncio.Event + backpressure: bool + sync_call: bool + num_tasks: int + pending_tasks: list[tuple[Task, asyncio.Future]] def __init__(self): - self.may_leave = True self.handles = HandleTables() - self.num_tasks = 0 + self.async_subtasks = Table[AsyncSubtask]() + self.may_leave = True + self.interruptible = asyncio.Event() + self.interruptible.set() self.backpressure = False - self.calling_sync_import = False + self.sync_call = False + self.num_tasks = 0 self.pending_tasks = [] - self.active_sync_task = False - self.pending_sync_tasks = [] - self.async_subtasks = Table[AsyncSubtask]() ``` -The `may_leave` field is used below to track whether the instance may call a -lowered import to prevent optimization-breaking cases of reentrance during -lowering. - -The `handles` field contains a mapping from `ResourceType` to `Table`s of -`HandleElem`s (defined next), establishing a separate `i32`-indexed array per -resource type. - -The `backpressure` and `pending_tasks` fields are used below to implement -backpressure that is applied when new export calls create new `Task`s in this -`ComponentInstance`. The `num_tasks` field tracks the number of live `Task`s in -this `ComponentInstance` and is primarily used to guard that a component -doesn't enter an invalid state where `backpressure` enabled but there are no -live tasks to disable it. - -The `calling_sync_import` flag also triggers backpressure when a component is -in the middle of a synchronous import call and does not expect to be reentered. - -The `active_sync_task` and `pending_sync_tasks` fields are similarly used to -serialize synchronously-lifted calls into this component instance. - -The `async_subtasks` field is used below to track and assign an `i32` index to -each active async-lowered call in progress that has been made by this -`ComponentInstance`. -One `HandleTables` object is stored per `ComponentInstance` and is defined as: +The `HandleTables` class stored in `ComponentInstance.handles` maps +`ResourceType`s to `Table`s of `HandleElem`s (defined next), establishing a +separate `i32`-indexed array per resource type: ```python class HandleTables: rt_to_table: MutableMapping[ResourceType, Table[HandleElem]] @@ -450,10 +426,10 @@ check that callers and callees uphold their respective parts of the call contract. This additional call state derives from `CallContext`, adding extra mutable fields. There are two subclasses of `CallContext`: `Task`, which is created by `canon_lift` and `Subtask`, which is created by `canon_lower`. -Additional sync-/async-specialized mutable state is added by the `SyncTask`, -`AsyncTask` and `AsyncSubtask` subclasses. +Additional sync-/async-specialized mutable state is added by the `AsyncTask` +and `AsyncSubtask` subclasses. -The `Task` class and its subclasses depend on the following two enums: +The `Task` class and its subclasses depend on the following three enums: ```python class AsyncCallState(IntEnum): STARTING = 0 @@ -467,12 +443,18 @@ class EventCode(IntEnum): CALL_RETURNED = AsyncCallState.RETURNED CALL_DONE = AsyncCallState.DONE YIELDED = 4 + +class OnBlockResult(IntEnum): + BLOCKED = 0 + COMPLETED = 1 ``` The `AsyncCallState` enum describes the linear sequence of states that an async call necessarily transitions through: [`STARTING`](Async.md#starting), `STARTED`, [`RETURNING`](Async.md#returning) and `DONE`. The `EventCode` enum shares common code values with `AsyncCallState` to define the set of integer event codes that are delivered to [waiting](Async.md#waiting) or polling tasks. +The `OnBlockResult` enum conveys the two possible results of the `on_block` +future used to tell callers whether or not the callee blocked. The `current_Task` global holds an `asyncio.Lock` that is used to prevent the Python runtime from arbitrarily switching between Python coroutines (`async @@ -490,54 +472,32 @@ current_task = asyncio.Lock() A `Task` object is created for each call to `canon_lift` and is implicitly threaded through all core function calls. This implicit `Task` parameter -specifies a concept of [the current task](Async.md#current-task) and inherently -scopes execution of all core wasm (including `canon`-defined core functions) to -a `Task`. +represents "[the current task](Async.md#current-task)". ```python class Task(CallContext): caller: Optional[Task] - on_block: Optional[Callable] + on_block: Optional[asyncio.Future[OnBlockResult]] borrow_count: int events: asyncio.Queue[AsyncSubtask] num_async_subtasks: int def __init__(self, opts, inst, caller, on_block): super().__init__(opts, inst) - assert(on_block is not None) self.caller = caller self.on_block = on_block self.borrow_count = 0 self.events = asyncio.Queue[AsyncSubtask]() self.num_async_subtasks = 0 ``` -The fields of `Task` are only accessed by the methods of `Task` and are -introduced in groups of related `Task`-methods next. Using a conservative -syntactic analysis of the component-level definitions of a linked component -DAG, an optimizing implementation can statically eliminate these fields when -the particular feature (`borrow` handles, `async` imports) is not used. +The fields of `Task` are introduced in groups of related `Task` methods next. +Using a conservative syntactic analysis of the component-level definitions of a +linked component DAG, an optimizing implementation can statically eliminate +these fields when the particular feature (`borrow` handles, `async` imports) is +not used. -The `enter()` method is called immediately after constructing the `Task` and is -responsible for preventing reentrance and implementing backpressure: -```python - async def enter(self): - assert(current_task.locked()) - self.trap_if_on_the_stack(self.inst) - self.inst.num_tasks += 1 - if not self.may_enter() or self.inst.pending_tasks: - f = asyncio.Future() - self.inst.pending_tasks.append(f) - await self.suspend(f) - assert(self.may_enter()) -``` - -The `caller` field is immutable and is either `None`, when a `Task` is created -for a component export called directly by the host, or else the current task -when the calling component called into this component. The `trap_if_on_the_stack` -method (called by `enter` above) uses `caller` to prevent a component from -being reentered (enforcing the [component invariant]) in a way that is -well-defined even in the presence of async calls. Having a `caller` depends on -having an async call tree which in turn depends on maintaining -[structured concurrency]. +The `trap_if_on_stack` helper method is called (below) to prevent reentrance. +The definition uses the `caller` field which points to the task's supertask in +the async call tree defined by Component Model [structured concurrency]. ```python def trap_if_on_the_stack(self, inst): c = self.caller @@ -558,92 +518,103 @@ O(n) loop in `trap_if_on_the_stack`: a packed bit-vector (assigning each potentially-reenterable async component instance a static bit position) that is passed by copy from caller to callee. -The definition of `may_enter` (used to trigger backpressure in `enter`) is a -combination of two boolean flags: whether backpressure was explicitly requested -by guest code (via `task.backpressure`) or implied by a synchronous import call -in-progress: +The `enter` method is called immediately after constructing a `Task` and, along +with `may_enter` and `may_start_pending_task`, implements backpressure. If a +`Task` tries to `enter` when `may_enter` is false, the `Task` suspends itself +(via `suspend`, shown next) and goes into a `pending_tasks` queue, waiting to +be unblocked by another task calling `maybe_start_pending_task`. One key +property of this backpressure scheme is that `pending_tasks` are only dequeued +one at a time, ensuring that if an overloaded component instance enables +backpressure (via `task.backpressure`) and then disables it, there will not be +an unstoppable thundering herd of pending tasks started all at once that OOM +the component before it can re-enable backpressure. ```python - def may_enter(self): - return not self.inst.backpressure and not self.inst.calling_sync_import -``` + async def enter(self): + assert(current_task.locked()) + self.trap_if_on_the_stack(self.inst) + if not self.may_enter(self) or self.inst.pending_tasks: + f = asyncio.Future() + self.inst.pending_tasks.append((self, f)) + await self.suspend(f) + self.inst.num_tasks += 1 + if self.opts.sync: + self.inst.sync_call = True -The key method of `Task`, used by `enter`, `wait` and `yield_`, is `suspend`, -which takes an `asyncio.Future` to `await` on. When suspending, there are two -cases to consider: -* This is the first time the current `Task` has blocked and thus there may be - an `async`-lowered caller waiting to find out that the callee blocked (which - is signalled by calling the `on_block` handler that the caller passed to - `canon_lift`). -* This task has already blocked in the past (signalled by `on_block` being - `None`) and thus there is no `async`-lowered caller to switch to and so we - let Python's `asyncio` scheduler non-deterministically pick some other task - that is ready to go, waiting to `acquire` the `current_task` lock. + def may_enter(self, pending_task): + return self.inst.interruptible.is_set() and \ + not self.inst.backpressure and \ + not self.inst.sync_call and \ + not (pending_task.opts.sync and self.inst.num_tasks > 0) -In either case, once the given future is resolved, this `Task` has to -re`acquire` the `current_stack` lock to run again. + def maybe_start_pending_task(self): + if self.inst.pending_tasks: + pending_task, future = self.inst.pending_tasks[0] + if self.may_enter(pending_task): + self.inst.pending_tasks.pop(0) + future.set_result(None) +``` +The rules shown above also ensure that synchronously-lifted exports only +execute when no other (sync or async) tasks are executing concurrently. + +The `suspend` method, called by `enter`, `wait` and `yield_`, takes a future +to `await` and allows other tasks to make progress in the meantime. Since the +`current_task` lock is shared by all linked component instances, releasing +then acquiring it allows switching to any task in the same or different +component instance. The final loop ensures that, when a component instance +makes a `sync`-lowered import call, which is signalled by clearing +`interruptible` for the duration of the call, no other tasks *in the same +component instance* can execute (which would otherwise break the appearance of +a *synchronous* call from the perspective of that component instance). ```python async def suspend(self, future): - assert(current_task.locked()) - if self.on_block: - self.on_block() - self.on_block = None + if self.on_block and not self.on_block.done(): + self.on_block.set_result(OnBlockResult.BLOCKED) else: current_task.release() - r = await future + v = await future await current_task.acquire() - return r + while not self.inst.interruptible.is_set(): + current_task.release() + await self.inst.interruptible.wait() + await current_task.acquire() + return v ``` -As a side note: the `suspend` method is so named because it could be -reimplemented using the [`suspend`] instruction of the [typed continuations] -proposal, removing the need for `on_block` and the subtle calling contract -between `Task.suspend` and `canon_lift`. +When there is an `async`-lowered caller waiting on the stack, the `on_block` +field will point to an unresolved future. In this case, `suspend` sets the +result of `on_block` and leaves `current_task` locked so that control flow +transfers deterministically to `async`-lowered caller (in `canon_lower`, +defined below). The `suspend` method is so named because this delicate use of +Python's async functionality is essentially emulating the `suspend`/`resume` +instructions of the [stack-switching] proposal. Thus, once stack-switching is +implemented, a valid implementation technique would be to compile Canonical ABI +adapters to Core WebAssembly using `suspend` and `resume`. While a task is running, it may call `wait` (via `canon task.wait` or, when a `callback` is present, by returning to the event loop) to block until there is -progress on one of the task's async subtasks. Although the Python code uses an -`asyncio.Queue` to coordinate async events, an optimized implementation should -not have to create an actual queue; instead it should be possible to embed a -"next ready" linked list in the elements of the `async_subtasks` table (noting -the `enqueued` guard above ensures that a subtask can be enqueued at most -once). +progress on one of the task's async subtasks. Alternatively, the current task +can call `poll` (via `canon task.poll`), which does not block and does not +allow the runtime to switch to another task. Lastly, a task may cooperatively +yield (via `canon task.yield`), allowing the switching but without waiting for +any external events (as emulated in the Python code by awaiting `sleep(0)`. ```python async def wait(self): self.maybe_start_pending_task() subtask = await self.suspend(self.events.get()) return self.process_event(subtask) - def maybe_start_pending_task(self): - if self.inst.pending_tasks and self.may_enter(): - self.inst.pending_tasks.pop(0).set_result(None) - - def process_event(self, subtask): - assert(subtask.supertask is self) - subtask.enqueued = False - return (EventCode(subtask.state), subtask.index) -``` -The `pending_tasks` queue (appended to by `enter` above) is emptied by `wait` -(and `yield_` and `exit` below) one at a time once backpressure is turned back -off, ensuring that each popped tasks gets a chance to start and possibly -re-enable backpressure before the next pending task is started: - -Instead of `wait`ing for a subtask to make progress, the current task can also -call `poll` (via `canon task.poll`, defined below), which does not block and -does not allow the runtime to switch to another task: -```python def poll(self): if self.events.empty(): return None return self.process_event(self.events.get_nowait()) -``` -A task may also cooperatively yield the current thread, explicitly allowing -the runtime to switch to another ready task, but without blocking on I/O (as -emulated in the Python code here by awaiting a `sleep(0)`). -```python async def yield_(self): self.maybe_start_pending_task() await self.suspend(asyncio.sleep(0)) ``` +Although this Python code uses an `asyncio.Queue` to coordinate async events, +an optimized implementation should not have to create an actual queue; instead +it should be possible to embed a "next ready" linked list in the elements of +the `async_subtasks` table. All `Task`s (whether lifted `async` or not) are allowed to call `async`-lowered imports. Calling an `async`-lowered import creates an `AsyncSubtask` (defined @@ -664,7 +635,17 @@ guarded to be `0` in `Task.exit` (below) to ensure [structured concurrency]. return subtask.enqueued = True self.events.put_nowait(subtask) + + def process_event(self, subtask): + assert(subtask.supertask is self) + subtask.enqueued = False + return (EventCode(subtask.state), subtask.index) ``` +Instead of simply enqueuing an `EventCode` when `async_subtask_made_progress`, +the above code achieves the effect of collapsing multiple events into one, +delivering only the newest state, by enqueing the mutable `AsyncSubtask` itself +at most once and only reading its `state` when the event is being *popped* from +the queue. The `borrow_count` field is used by the following methods to track the number of borrowed handles that were passed as parameters to the export that have not @@ -680,24 +661,19 @@ after this export call finishes): ``` Lastly, when a task exits, the runtime enforces the guard conditions mentioned -above and allows other tasks to start or make progress. +above and allows pending tasks to start. ```python def exit(self): assert(current_task.locked()) assert(self.events.empty()) assert(self.inst.num_tasks >= 1) - trap_if(self.inst.backpressure and self.inst.num_tasks == 1) trap_if(self.borrow_count != 0) trap_if(self.num_async_subtasks != 0) self.inst.num_tasks -= 1 + if self.opts.sync: + self.inst.sync_call = False self.maybe_start_pending_task() - if not self.on_block: - current_task.release() ``` -If this `Task` has not yet blocked, there is an active `async`-lowered caller -on the stack, so we don't release the `current_task` lock; instead we just let -the `Task`'s Python coroutine return directly to the `await`ing caller without -a non-deterministic task switch. While `canon_lift` creates `Task`s, `canon_lower` creates `Subtask` objects: ```python @@ -724,37 +700,9 @@ list usually has a fixed size (in all cases except when a function signature has `borrow`s in `list`s) and thus can be stored inline in the native stack frame. -The following `SyncTask`/`AsyncTask`/`AsyncSubtask` classes extend the -preceding `Task`/`Subtask` classes with additional state and methods that apply -only to the sync or async case. - -The `SyncTask` classes overrides the `enter` and `exit` methods to additionally -enforce the rule that there only ever at most one synchronous task running in a -given component instance at a given time. -```python -class SyncTask(Task): - async def enter(self): - await super().enter() - if self.inst.active_sync_task: - f = asyncio.Future() - self.inst.pending_sync_tasks.append(f) - await self.suspend(f) - assert(not self.inst.active_sync_task) - self.inst.active_sync_task = True - - def exit(self): - assert(self.inst.active_sync_task) - self.inst.active_sync_task = False - if self.inst.pending_sync_tasks: - self.inst.pending_sync_tasks.pop(0).set_result(None) - super().exit() -``` -Thus, after one sync task starts running, any subsequent attempts to call into -the same component instance before the first sync task finishes will wait in a -LIFO queue until the sync task ahead of them in line completes. An optimized -implementation should be able to avoid separately allocating -`pending_sync_tasks` by instead embedding a "next pending" linked list in the -`Subtask` table element of the caller. +The following `AsyncTask`/`AsyncSubtask` classes extend the preceding +`Task`/`Subtask` classes with additional state and methods that apply only to +the async case. The first 3 fields of `AsyncTask` are simply immutable copies of arguments/immediates passed to `canon_lift` and are used by the `task.start` @@ -1883,7 +1831,6 @@ storage with `realloc` or accepting a caller-allocated buffer as an out-param: ```python def lower_flat_values(cx, max_flat, vs, ts, out_param = None): - assert(cx.inst.may_leave) cx.inst.may_leave = False flat_types = flatten_types(ts) if len(flat_types) > max_flat: @@ -1940,8 +1887,8 @@ When instantiating component instance `$inst`: The resulting function `$f` takes 4 runtime arguments: * `caller`: the caller's `Task` or, if this lifted function is being called by the host, `None` -* `on_block`: a nullary function that must be called at most once by the callee - before blocking the first time +* `on_block`: an optional `asyncio.Future` that must be resolved with + `OnBlockResult.BLOCKED` if the callee blocks on I/O * `on_start`: a nullary function that must be called to return the caller's arguments as a list of component-level values * `on_return`: a unary function that must be called after `on_start`, @@ -1968,7 +1915,7 @@ Based on this, `canon_lift` is defined: ```python async def canon_lift(opts, inst, callee, ft, caller, on_block, on_start, on_return): if opts.sync: - task = SyncTask(opts, inst, caller, on_block) + task = Task(opts, inst, caller, on_block) await task.enter() flat_args = lower_flat_values(task, MAX_FLAT_PARAMS, on_start(), ft.param_types()) @@ -2055,31 +2002,26 @@ The resulting function `$f` takes 2 runtime arguments: Given this, `canon_lower` is defined: ```python async def canon_lower(opts, callee, ft, task, flat_args): + assert(current_task.locked()) trap_if(not task.inst.may_leave) flat_args = CoreValueIter(flat_args) flat_results = None if opts.sync: subtask = Subtask(opts, task.inst) - task.inst.calling_sync_import = True - def on_block(): - if task.on_block: - task.on_block() - task.on_block = None + task.inst.interruptible.clear() def on_start(): return lift_flat_values(subtask, MAX_FLAT_PARAMS, flat_args, ft.param_types()) def on_return(results): nonlocal flat_results flat_results = lower_flat_values(subtask, MAX_FLAT_RESULTS, results, ft.result_types(), flat_args) - await callee(task, on_block, on_start, on_return) - task.inst.calling_sync_import = False + await callee(task, task.on_block, on_start, on_return) + task.inst.interruptible.set() subtask.finish() else: subtask = AsyncSubtask(opts, task.inst) - eager_result = asyncio.Future() + on_block = asyncio.Future() async def do_call(): - def on_block(): - eager_result.set_result('block') def on_start(): subtask.start() return lift_flat_values(subtask, 1, flat_args, ft.param_types()) @@ -2088,41 +2030,41 @@ async def canon_lower(opts, callee, ft, task, flat_args): lower_flat_values(subtask, 0, results, ft.result_types(), flat_args) await callee(task, on_block, on_start, on_return) subtask.finish() - if not eager_result.done(): - eager_result.set_result('complete') + if on_block.done(): + current_task.release() + else: + on_block.set_result(OnBlockResult.COMPLETED) asyncio.create_task(do_call()) - match await eager_result: - case 'complete': - flat_results = [0] - case 'block': + match await on_block: + case OnBlockResult.BLOCKED: i = task.add_async_subtask(subtask) flat_results = [pack_async_result(i, subtask.state)] + case OnBlockResult.COMPLETED: + flat_results = [0] + assert(current_task.locked()) return flat_results ``` -In the synchronous case, the import call is bracketed by setting -`calling_sync_import` to prevent reentrance into the current component instance -if the `callee` blocks and the caller gets control flow (via `on_block`). Like -`Task.suspend` above, `canon_lift` clears the `on_block` handler after calling -to signal that the current `Task` has already released any waiting -`async`-lowered callers. - -In the asynchronous case, we finally see the whole point of `on_block` which is -to allow us to wait for one of two outcomes: the callee blocks or the callee -finishes without blocking. Whichever happens first resolves the `eager_result` -future. After calling `asyncio.create_task`, `canon_lift` immediately `await`s -`eager_result` so that there is no allowed interleaving between the caller and -callee's Python coroutines. This overall behavior resembles the [`resume`] -instruction of the [typed continuations] proposal (handling a `block` effect) -which could be used to more-directly implement the Python control flow here. +In the synchronous case, the import call is bracketed by clearing +`interruptible` for the duration of the call. Clearing `interrupt` both +prevents new tasks from starting (via `Task.may_enter`) and prevents suspended +tasks from resuming execution (via `Task.suspend`). Propagating the caller's +`on_block` future ensures that an `async`-lowered caller anywhere higher up on +the stack still gets control flow if `callee` blocks. + +In the asynchronous case, the `on_block` future allows the caller to `await` to +see if `callee` blocks on I/O before returning. Because `Task.suspend` (defined +above) does not release the `current_task` lock when it resolves `on_block` to +`BLOCKED`, control flow deterministically returns to the caller (without +executing other tasks) when `callee` blocks. This is analogous to how the +`resume` instruction of the [stack-switching] proposal would work if given +`(cont.new $callee)` and handling an `on_block` event. Whether or not the `callee` blocks, the `on_start` and `on_return` handlers -must be called before the `callee` completes (either by `canon_lift` in the -synchronous case or the `task.start`/`task.return` built-ins in the -asynchronous case). Note that, when `async`-lowering, lifting and lowering -can happen after `canon_lower` returns and thus the caller must `task.wait` -for `EventCode`s to know when the supplied linear memory pointers can be -reused. +will be called before the `callee` completes. Note that, in an `async`-lowered +call, `on_start` and `on_return` can happen after `canon_lower` returns and +thus the caller must `task.wait` for progress events to know when the supplied +linear memory pointers can be reclaimed by the caller. If an `async`-lowered call blocks, the `AsyncSubtask` is added to the component instance's `async_subtasks` table, and the index and state are returned to the @@ -2136,13 +2078,13 @@ def pack_async_result(i, state): The above definitions of sync/async `canon_lift`/`canon_lower` ensure that a sync-or-async `canon_lift` may call a sync-or-async `canon_lower`, with all -combinations working. This is why the `Task` base class (derived by `SyncTask` -and `AsyncTask`) contains the code for handling async-lowered subtasks. As -mentioned above, conservative syntactic analysis of all `canon` definitions in -a component can statically rule out combinations so that, e.g., a DAG of -all-sync components use a plain synchronous callstack and a DAG of all `async -callback` components use only an event loop without fibers. It's only when -`async` (without a `callback`) or various compositions of async and sync +combinations working. This is why the `Task` class, which is used for both sync +and async `canon_lift` calls, contains the code for handling async-lowered +subtasks. As mentioned above, conservative syntactic analysis of all `canon` +definitions in a component can statically rule out combinations so that, e.g., +a DAG of all-sync components use a plain synchronous callstack and a DAG of all +`async callback` components use only an event loop without fibers. It's only +when `async` (without a `callback`) or various compositions of async and sync components are used that fibers (or [Asyncify]) are required to implement the above async rules. @@ -2352,9 +2294,9 @@ through the event loop at the base of the stack. Note that `task.wait` will `suspend` the current `Task`, allowing other tasks to run. Note also that `task.wait` can be called from a synchronously-lifted export so that even synchronous code can make concurrent import calls. In these -synchronous cases, though, the automatic backpressure (applied by -`SyncTask.enter`) will ensure there is only ever at most once -synchronously-lifted task executing in a component instance at a time. +synchronous cases, though, the automatic backpressure (applied by `Task.enter`) +will ensure there is only ever at most once synchronously-lifted task executing +in a component instance at a time. ### 🔀 `canon task.poll` @@ -2507,9 +2449,7 @@ def canon_thread_hw_concurrency(): [Exceptions]: https://github.com/WebAssembly/exception-handling/blob/main/proposals/exception-handling/Exceptions.md [WASI]: https://github.com/webassembly/wasi [Deterministic Profile]: https://github.com/WebAssembly/profiles/blob/main/proposals/profiles/Overview.md -[Typed Continuations]: https://github.com/WebAssembly/stack-switching/blob/main/proposals/continuations/Explainer.md -[`suspend`]: https://github.com/WebAssembly/stack-switching/blob/main/proposals/continuations/Explainer.md#suspending-continuations -[`resume`]: https://github.com/WebAssembly/stack-switching/blob/main/proposals/continuations/Explainer.md#invoking-continuations +[stack-switching]: https://github.com/WebAssembly/stack-switching [Alignment]: https://en.wikipedia.org/wiki/Data_structure_alignment [UTF-8]: https://en.wikipedia.org/wiki/UTF-8 diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index e1087723..eec9417c 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -293,27 +293,25 @@ class CanonicalOptions: ### Runtime State class ComponentInstance: - # core module instance state - may_leave: bool handles: HandleTables - num_tasks: int - backpressure: bool - calling_sync_import: bool - pending_tasks: list[asyncio.Future] - active_sync_task: bool - pending_sync_tasks: list[asyncio.Future] async_subtasks: Table[AsyncSubtask] + may_leave: bool + interruptible: asyncio.Event + backpressure: bool + sync_call: bool + num_tasks: int + pending_tasks: list[tuple[Task, asyncio.Future]] def __init__(self): - self.may_leave = True self.handles = HandleTables() - self.num_tasks = 0 + self.async_subtasks = Table[AsyncSubtask]() + self.may_leave = True + self.interruptible = asyncio.Event() + self.interruptible.set() self.backpressure = False - self.calling_sync_import = False + self.sync_call = False + self.num_tasks = 0 self.pending_tasks = [] - self.active_sync_task = False - self.pending_sync_tasks = [] - self.async_subtasks = Table[AsyncSubtask]() class HandleTables: rt_to_table: MutableMapping[ResourceType, Table[HandleElem]] @@ -401,68 +399,75 @@ class EventCode(IntEnum): CALL_DONE = AsyncCallState.DONE YIELDED = 4 +class OnBlockResult(IntEnum): + BLOCKED = 0 + COMPLETED = 1 + current_task = asyncio.Lock() class Task(CallContext): caller: Optional[Task] - on_block: Optional[Callable] + on_block: Optional[asyncio.Future[OnBlockResult]] borrow_count: int events: asyncio.Queue[AsyncSubtask] num_async_subtasks: int def __init__(self, opts, inst, caller, on_block): super().__init__(opts, inst) - assert(on_block is not None) self.caller = caller self.on_block = on_block self.borrow_count = 0 self.events = asyncio.Queue[AsyncSubtask]() self.num_async_subtasks = 0 + def trap_if_on_the_stack(self, inst): + c = self.caller + while c is not None: + trap_if(c.inst is inst) + c = c.caller + async def enter(self): assert(current_task.locked()) self.trap_if_on_the_stack(self.inst) - self.inst.num_tasks += 1 - if not self.may_enter() or self.inst.pending_tasks: + if not self.may_enter(self) or self.inst.pending_tasks: f = asyncio.Future() - self.inst.pending_tasks.append(f) + self.inst.pending_tasks.append((self, f)) await self.suspend(f) - assert(self.may_enter()) + self.inst.num_tasks += 1 + if self.opts.sync: + self.inst.sync_call = True - def trap_if_on_the_stack(self, inst): - c = self.caller - while c is not None: - trap_if(c.inst is inst) - c = c.caller + def may_enter(self, pending_task): + return self.inst.interruptible.is_set() and \ + not self.inst.backpressure and \ + not self.inst.sync_call and \ + not (pending_task.opts.sync and self.inst.num_tasks > 0) - def may_enter(self): - return not self.inst.backpressure and not self.inst.calling_sync_import + def maybe_start_pending_task(self): + if self.inst.pending_tasks: + pending_task, future = self.inst.pending_tasks[0] + if self.may_enter(pending_task): + self.inst.pending_tasks.pop(0) + future.set_result(None) async def suspend(self, future): - assert(current_task.locked()) - if self.on_block: - self.on_block() - self.on_block = None + if self.on_block and not self.on_block.done(): + self.on_block.set_result(OnBlockResult.BLOCKED) else: current_task.release() - r = await future + v = await future await current_task.acquire() - return r + while not self.inst.interruptible.is_set(): + current_task.release() + await self.inst.interruptible.wait() + await current_task.acquire() + return v async def wait(self): self.maybe_start_pending_task() subtask = await self.suspend(self.events.get()) return self.process_event(subtask) - def maybe_start_pending_task(self): - if self.inst.pending_tasks and self.may_enter(): - self.inst.pending_tasks.pop(0).set_result(None) - - def process_event(self, subtask): - assert(subtask.supertask is self) - subtask.enqueued = False - return (EventCode(subtask.state), subtask.index) - def poll(self): if self.events.empty(): return None @@ -486,6 +491,11 @@ def async_subtask_made_progress(self, subtask): subtask.enqueued = True self.events.put_nowait(subtask) + def process_event(self, subtask): + assert(subtask.supertask is self) + subtask.enqueued = False + return (EventCode(subtask.state), subtask.index) + def create_borrow(self): self.borrow_count += 1 @@ -497,13 +507,12 @@ def exit(self): assert(current_task.locked()) assert(self.events.empty()) assert(self.inst.num_tasks >= 1) - trap_if(self.inst.backpressure and self.inst.num_tasks == 1) trap_if(self.borrow_count != 0) trap_if(self.num_async_subtasks != 0) self.inst.num_tasks -= 1 + if self.opts.sync: + self.inst.sync_call = False self.maybe_start_pending_task() - if not self.on_block: - current_task.release() class Subtask(CallContext): lenders: list[HandleElem] @@ -521,23 +530,6 @@ def finish(self): for h in self.lenders: h.lend_count -= 1 -class SyncTask(Task): - async def enter(self): - await super().enter() - if self.inst.active_sync_task: - f = asyncio.Future() - self.inst.pending_sync_tasks.append(f) - await self.suspend(f) - assert(not self.inst.active_sync_task) - self.inst.active_sync_task = True - - def exit(self): - assert(self.inst.active_sync_task) - self.inst.active_sync_task = False - if self.inst.pending_sync_tasks: - self.inst.pending_sync_tasks.pop(0).set_result(None) - super().exit() - class AsyncTask(Task): ft: FuncType on_start: Callable @@ -1307,7 +1299,6 @@ def lift_heap_values(cx, vi, ts): return list(load(cx, ptr, tuple_type).values()) def lower_flat_values(cx, max_flat, vs, ts, out_param = None): - assert(cx.inst.may_leave) cx.inst.may_leave = False flat_types = flatten_types(ts) if len(flat_types) > max_flat: @@ -1337,7 +1328,7 @@ def lower_heap_values(cx, vs, ts, out_param): async def canon_lift(opts, inst, callee, ft, caller, on_block, on_start, on_return): if opts.sync: - task = SyncTask(opts, inst, caller, on_block) + task = Task(opts, inst, caller, on_block) await task.enter() flat_args = lower_flat_values(task, MAX_FLAT_PARAMS, on_start(), ft.param_types()) @@ -1378,31 +1369,26 @@ async def call_and_trap_on_throw(callee, task, args): ### `canon lower` async def canon_lower(opts, callee, ft, task, flat_args): + assert(current_task.locked()) trap_if(not task.inst.may_leave) flat_args = CoreValueIter(flat_args) flat_results = None if opts.sync: subtask = Subtask(opts, task.inst) - task.inst.calling_sync_import = True - def on_block(): - if task.on_block: - task.on_block() - task.on_block = None + task.inst.interruptible.clear() def on_start(): return lift_flat_values(subtask, MAX_FLAT_PARAMS, flat_args, ft.param_types()) def on_return(results): nonlocal flat_results flat_results = lower_flat_values(subtask, MAX_FLAT_RESULTS, results, ft.result_types(), flat_args) - await callee(task, on_block, on_start, on_return) - task.inst.calling_sync_import = False + await callee(task, task.on_block, on_start, on_return) + task.inst.interruptible.set() subtask.finish() else: subtask = AsyncSubtask(opts, task.inst) - eager_result = asyncio.Future() + on_block = asyncio.Future() async def do_call(): - def on_block(): - eager_result.set_result('block') def on_start(): subtask.start() return lift_flat_values(subtask, 1, flat_args, ft.param_types()) @@ -1411,16 +1397,19 @@ def on_return(results): lower_flat_values(subtask, 0, results, ft.result_types(), flat_args) await callee(task, on_block, on_start, on_return) subtask.finish() - if not eager_result.done(): - eager_result.set_result('complete') + if on_block.done(): + current_task.release() + else: + on_block.set_result(OnBlockResult.COMPLETED) asyncio.create_task(do_call()) - match await eager_result: - case 'complete': - flat_results = [0] - case 'block': + match await on_block: + case OnBlockResult.BLOCKED: i = task.add_async_subtask(subtask) flat_results = [pack_async_result(i, subtask.state)] + case OnBlockResult.COMPLETED: + flat_results = [0] + assert(current_task.locked()) return flat_results def pack_async_result(i, state): diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index 36cec90f..bd260d8a 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -3,9 +3,6 @@ asyncio.run(definitions.current_task.acquire()) -def unlock_on_block(): - definitions.current_task.release() - def equal_modulo_string_encoding(s, t): if s is None and t is None: return True @@ -351,7 +348,7 @@ async def callee(task, x): caller_heap = Heap(1000) caller_opts = mk_opts(caller_heap.memory, 'utf8', caller_heap.realloc) caller_inst = ComponentInstance() - caller_task = Task(caller_opts, caller_inst, None, lambda:()) + caller_task = Task(caller_opts, caller_inst, None, None) return_in_heap = len(flatten_types([t])) > definitions.MAX_FLAT_RESULTS @@ -590,8 +587,7 @@ def on_return(results): got = results consumer_inst = ComponentInstance() - await canon_lift(consumer_opts, consumer_inst, consumer, ft, None, unlock_on_block, on_start, on_return) - await current_task.acquire() + await canon_lift(consumer_opts, consumer_inst, consumer, ft, None, None, on_start, on_return) assert(len(got) == 1) assert(got[0] == 42) @@ -662,8 +658,7 @@ def on_return(results): opts.sync = False opts.callback = callback - await canon_lift(opts, consumer_inst, consumer, consumer_ft, None, unlock_on_block, on_start, on_return) - await current_task.acquire() + await canon_lift(opts, consumer_inst, consumer, consumer_ft, None, None, on_start, on_return) assert(got[0] == 83) asyncio.run(test_async_callback()) @@ -739,8 +734,7 @@ def on_return(results): nonlocal got got = results - await canon_lift(consumer_opts, consumer_inst, consumer, consumer_ft, None, unlock_on_block, on_start, on_return) - await current_task.acquire() + await canon_lift(consumer_opts, consumer_inst, consumer, consumer_ft, None, None, on_start, on_return) assert(got[0] == 83) asyncio.run(test_async_to_sync()) @@ -820,8 +814,7 @@ def on_return(results): nonlocal got got = results - await canon_lift(consumer_opts, consumer_inst, consumer, consumer_ft, None, unlock_on_block, on_start, on_return) - await current_task.acquire() + await canon_lift(consumer_opts, consumer_inst, consumer, consumer_ft, None, None, on_start, on_return) assert(got[0] == 84) asyncio.run(test_async_backpressure()) @@ -870,8 +863,7 @@ async def core_func(task, args): inst = ComponentInstance() def on_start(): return [] def on_return(results): pass - await canon_lift(mk_opts(), inst, core_func, ft, None, unlock_on_block, on_start, on_return) - await current_task.acquire() + await canon_lift(mk_opts(), inst, core_func, ft, None, None, on_start, on_return) asyncio.run(test_sync_using_wait())