Skip to content

Commit

Permalink
CABI: simplify how async lowering works
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Sep 6, 2024
1 parent ab4ca42 commit 5cb57ad
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 127 deletions.
156 changes: 67 additions & 89 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ created by `canon_lift` and `Subtask`, which is created by `canon_lower`.
Additional sync-/async-specialized mutable state is added by the `AsyncTask`
and `AsyncSubtask` subclasses.

The `Task` class and its subclasses depend on the following two enums:
The `Task` class and its subclasses depend on the following three enums:
```python
class AsyncCallState(IntEnum):
STARTING = 0
Expand All @@ -458,12 +458,18 @@ class EventCode(IntEnum):
CALL_RETURNED = AsyncCallState.RETURNED
CALL_DONE = AsyncCallState.DONE
YIELDED = 4

class OnBlockResult(IntEnum):
BLOCKED = 0
COMPLETED = 1
```
The `AsyncCallState` enum describes the linear sequence of states that an async
call necessarily transitions through: [`STARTING`](Async.md#starting),
`STARTED`, [`RETURNING`](Async.md#returning) and `DONE`. The `EventCode` enum
shares common code values with `AsyncCallState` to define the set of integer
event codes that are delivered to [waiting](Async.md#waiting) or polling tasks.
The `OnBlockResult` enum conveys the two possible results of the `on_block`
future used to tell callers whether or not the callee blocked.

The `current_Task` global holds an `asyncio.Lock` that is used to prevent the
Python runtime from arbitrarily switching between Python coroutines (`async
Expand All @@ -481,27 +487,24 @@ current_task = asyncio.Lock()

A `Task` object is created for each call to `canon_lift` and is implicitly
threaded through all core function calls. This implicit `Task` parameter
specifies a concept of [the current task](Async.md#current-task) and inherently
scopes execution of all core wasm (including `canon`-defined core functions) to
a `Task`.
represents "[the current task](Async.md#current-task)".
```python
class Task(CallContext):
caller: Optional[Task]
on_block: Optional[Callable]
on_block: Optional[asyncio.Future[OnBlockResult]]
borrow_count: int
events: asyncio.Queue[AsyncSubtask]
num_async_subtasks: int

def __init__(self, opts, inst, caller, on_block):
super().__init__(opts, inst)
assert(on_block is not None)
self.caller = caller
self.on_block = on_block
self.borrow_count = 0
self.events = asyncio.Queue[AsyncSubtask]()
self.num_async_subtasks = 0
```
The fields of `Task` are introduced in groups of related `Task`-methods next.
The fields of `Task` are introduced in groups of related `Task` methods next.
Using a conservative syntactic analysis of the component-level definitions of a
linked component DAG, an optimizing implementation can statically eliminate
these fields when the particular feature (`borrow` handles, `async` imports) is
Expand Down Expand Up @@ -531,16 +534,15 @@ O(n) loop in `trap_if_on_the_stack`:
instance a static bit position) that is passed by copy from caller to callee.

The `enter` method is called immediately after constructing a `Task` and, along
with the `may_enter` and `may_start_pending_task` helper functions, implements
backpressure. If a `Task` tries to `enter` when `may_enter` is false, the
`Task` suspends itself (via `suspend`, shown next) and goes into a
`pending_tasks` queue, waiting to be unblocked when `may_enter` is true by
another task calling `maybe_start_pending_task`. One key property of this
backpressure scheme is that `pending_tasks` are only dequeued one at a time,
ensuring that if an overloaded component instance enables backpressure (via
`task.backpressure`) and then disables it, there will not be an unstoppable
thundering herd of pending tasks started all at once that OOM the component
before it can re-enable backpressure.
with `may_enter` and `may_start_pending_task`, implements backpressure. If a
`Task` tries to `enter` when `may_enter` is false, the `Task` suspends itself
(via `suspend`, shown next) and goes into a `pending_tasks` queue, waiting to
be unblocked by another task calling `maybe_start_pending_task`. One key
property of this backpressure scheme is that `pending_tasks` are only dequeued
one at a time, ensuring that if an overloaded component instance enables
backpressure (via `task.backpressure`) and then disables it, there will not be
an unstoppable thundering herd of pending tasks started all at once that OOM
the component before it can re-enable backpressure.
```python
async def enter(self):
assert(current_task.locked())
Expand Down Expand Up @@ -569,36 +571,29 @@ before it can re-enable backpressure.
The rules shown above also ensure that synchronously-lifted exports only
execute when no other (sync or async) tasks are executing concurrently.

The `suspend` method, used by `enter`, `wait` and `yield_`, takes an
`asyncio.Future` to `await` and allows other tasks to make progress in the
meantime. When suspending, there are two cases to consider:
* This is the first time the current `Task` has blocked and thus there may be
an `async`-lowered caller waiting to find out that the callee blocked (which
is signalled by calling the `on_block` handler that the caller passed to
`canon_lift`).
* This task has already blocked in the past (signalled by `on_block` being
`None`) and thus there is no `async`-lowered caller to switch to and so we
let Python's `asyncio` scheduler non-deterministically pick some other task
that is ready to go by releasing the `current_task` lock.

In either case, once the given future is resolved, this `Task` has to
re-`acquire` the `current_stack` lock to run again.
The `suspend` method, called by `enter`, `wait` and `yield_`, takes a future to
`await` and allows other tasks to make progress in the meantime. Once this
future is resolved, the current task must reacquire the `current_task` lock to
wait for any other task that is currently executing to suspend or exit.
```python
async def suspend(self, future):
assert(current_task.locked())
if self.on_block:
self.on_block()
self.on_block = None
if self.on_block and not self.on_block.done():
self.on_block.set_result(OnBlockResult.BLOCKED)
else:
current_task.release()
r = await future
v = await future
await current_task.acquire()
return r
return v
```
As a side note: the `suspend` method is so named because it could be
reimplemented using the [`suspend`] instruction of the [typed continuations]
proposal, removing the need for `on_block` and the subtle calling contract
between `suspend` and `canon_lift`.
When there is an `async`-lowered caller waiting on the stack, the `on_block`
field will point to an unresolved future. In this case, `suspend` sets the
result of `on_block` and leaves `current_task` locked so that control flow
transfers deterministically to `async`-lowered caller (in `canon_lower`,
defined below). The `suspend` method is so named because this delicate use of
Python's async functionality is essentially emulating the `suspend`/`resume`
instructions of the [stack-switching] proposal. Thus, once stack-switching is
implemented, a valid implementation technique would be to compile Canonical ABI
adapters to Core WebAssembly using `suspend` and `resume`.

While a task is running, it may call `wait` (via `canon task.wait` or, when a
`callback` is present, by returning to the event loop) to block until there is
Expand Down Expand Up @@ -672,13 +667,7 @@ after this export call finishes):
```

Lastly, when a task exits, the runtime enforces the guard conditions mentioned
above and allows other tasks to start or make progress. If the exiting `Task`
has not yet blocked, there is an active `async`-lowered caller on the stack, so
we don't release the `current_task` lock and instead just let the `Task`'s
Python coroutine return directly to the `await`ing caller without any possible
task switch. The net effect is that when a cross-component async starts and
finishes without blocking, there doesn't need to be stack switching or async
resource allocation.
above and allows pending tasks to start.
```python
def exit(self):
assert(current_task.locked())
Expand All @@ -690,8 +679,6 @@ resource allocation.
if self.opts.sync:
self.inst.may_not_enter_bc_sync_export = False
self.maybe_start_pending_task()
if not self.on_block:
current_task.release()
```

While `canon_lift` creates `Task`s, `canon_lower` creates `Subtask` objects:
Expand Down Expand Up @@ -1906,8 +1893,8 @@ When instantiating component instance `$inst`:
The resulting function `$f` takes 4 runtime arguments:
* `caller`: the caller's `Task` or, if this lifted function is being called by
the host, `None`
* `on_block`: a nullary function that must be called at most once by the callee
before blocking the first time
* `on_block`: an optional `asyncio.Future` that must be resolved with
`OnBlockResult.BLOCKED` if the callee blocks on I/O
* `on_start`: a nullary function that must be called to return the caller's
arguments as a list of component-level values
* `on_return`: a unary function that must be called after `on_start`,
Expand Down Expand Up @@ -2021,31 +2008,26 @@ The resulting function `$f` takes 2 runtime arguments:
Given this, `canon_lower` is defined:
```python
async def canon_lower(opts, callee, ft, task, flat_args):
assert(current_task.locked())
trap_if(task.inst.may_not_leave)

flat_args = CoreValueIter(flat_args)
flat_results = None
if opts.sync:
subtask = Subtask(opts, task.inst)
task.inst.may_not_enter_bc_sync_import = True
def on_block():
if task.on_block:
task.on_block()
task.on_block = None
def on_start():
return lift_flat_values(subtask, MAX_FLAT_PARAMS, flat_args, ft.param_types())
def on_return(results):
nonlocal flat_results
flat_results = lower_flat_values(subtask, MAX_FLAT_RESULTS, results, ft.result_types(), flat_args)
await callee(task, on_block, on_start, on_return)
await callee(task, task.on_block, on_start, on_return)
task.inst.may_not_enter_bc_sync_import = False
subtask.finish()
else:
subtask = AsyncSubtask(opts, task.inst)
eager_result = asyncio.Future()
on_block = asyncio.Future()
async def do_call():
def on_block():
eager_result.set_result('block')
def on_start():
subtask.start()
return lift_flat_values(subtask, 1, flat_args, ft.param_types())
Expand All @@ -2054,41 +2036,39 @@ async def canon_lower(opts, callee, ft, task, flat_args):
lower_flat_values(subtask, 0, results, ft.result_types(), flat_args)
await callee(task, on_block, on_start, on_return)
subtask.finish()
if not eager_result.done():
eager_result.set_result('complete')
if on_block.done():
current_task.release()
else:
on_block.set_result(OnBlockResult.COMPLETED)
asyncio.create_task(do_call())
match await eager_result:
case 'complete':
flat_results = [0]
case 'block':
match await on_block:
case OnBlockResult.BLOCKED:
i = task.add_async_subtask(subtask)
flat_results = [pack_async_result(i, subtask.state)]
case OnBlockResult.COMPLETED:
flat_results = [0]

assert(current_task.locked())
return flat_results
```
In the synchronous case, the import call is bracketed by setting
`calling_sync_import` to prevent reentrance into the current component instance
if the `callee` blocks and the caller gets control flow (via `on_block`). Like
`Task.suspend` above, `canon_lift` clears the `on_block` handler after calling
to signal that the current `Task` has already released any waiting
`async`-lowered callers.

In the asynchronous case, we finally see the whole point of `on_block` which is
to allow us to wait for one of two outcomes: the callee blocks or the callee
finishes without blocking. Whichever happens first resolves the `eager_result`
future. After calling `asyncio.create_task`, `canon_lift` immediately `await`s
`eager_result` so that there is no allowed interleaving between the caller and
callee's Python coroutines. This overall behavior resembles the [`resume`]
instruction of the [typed continuations] proposal (handling a `block` effect)
which could be used to more-directly implement the Python control flow here.
`may_not_enter_bc_sync_import` to prevent reentrance into the current component
instance if the `callee` blocks and the caller gets control flow (via
`on_block`).

In the asynchronous case, the `on_block` future allows the caller to `await` to
see if `callee` blocks on I/O before returning. Because `Task.suspend` (defined
above) does not release the `current_task` lock when it resolves `on_block` to
`BLOCKED`, control flow deterministically returns to the caller (without
executing other tasks) when `callee` blocks. This is analogous to how the
`resume` instruction of the [stack-switching] proposal would work if given
`(cont.new $callee)` and handling an `on_block` event.

Whether or not the `callee` blocks, the `on_start` and `on_return` handlers
must be called before the `callee` completes (either by `canon_lift` in the
synchronous case or the `task.start`/`task.return` built-ins in the
asynchronous case). Note that, when `async`-lowering, lifting and lowering
can happen after `canon_lower` returns and thus the caller must `task.wait`
for `EventCode`s to know when the supplied linear memory pointers can be
reused.
will be called before the `callee` completes. Note that, in an `async`-lowered
call, `on_start` and `on_return` can happen after `canon_lower` returns and
thus the caller must `task.wait` for progress events to know when the supplied
linear memory pointers can be reclaimed by the caller.

If an `async`-lowered call blocks, the `AsyncSubtask` is added to the component
instance's `async_subtasks` table, and the index and state are returned to the
Expand Down Expand Up @@ -2473,9 +2453,7 @@ def canon_thread_hw_concurrency():
[Exceptions]: https://github.com/WebAssembly/exception-handling/blob/main/proposals/exception-handling/Exceptions.md
[WASI]: https://github.com/webassembly/wasi
[Deterministic Profile]: https://github.com/WebAssembly/profiles/blob/main/proposals/profiles/Overview.md
[Typed Continuations]: https://github.com/WebAssembly/stack-switching/blob/main/proposals/continuations/Explainer.md
[`suspend`]: https://github.com/WebAssembly/stack-switching/blob/main/proposals/continuations/Explainer.md#suspending-continuations
[`resume`]: https://github.com/WebAssembly/stack-switching/blob/main/proposals/continuations/Explainer.md#invoking-continuations
[stack-switching]: https://github.com/WebAssembly/stack-switching

[Alignment]: https://en.wikipedia.org/wiki/Data_structure_alignment
[UTF-8]: https://en.wikipedia.org/wiki/UTF-8
Expand Down
45 changes: 21 additions & 24 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,18 +398,21 @@ class EventCode(IntEnum):
CALL_DONE = AsyncCallState.DONE
YIELDED = 4

class OnBlockResult(IntEnum):
BLOCKED = 0
COMPLETED = 1

current_task = asyncio.Lock()

class Task(CallContext):
caller: Optional[Task]
on_block: Optional[Callable]
on_block: Optional[asyncio.Future[OnBlockResult]]
borrow_count: int
events: asyncio.Queue[AsyncSubtask]
num_async_subtasks: int

def __init__(self, opts, inst, caller, on_block):
super().__init__(opts, inst)
assert(on_block is not None)
self.caller = caller
self.on_block = on_block
self.borrow_count = 0
Expand Down Expand Up @@ -447,15 +450,13 @@ def maybe_start_pending_task(self):
future.set_result(None)

async def suspend(self, future):
assert(current_task.locked())
if self.on_block:
self.on_block()
self.on_block = None
if self.on_block and not self.on_block.done():
self.on_block.set_result(OnBlockResult.BLOCKED)
else:
current_task.release()
r = await future
v = await future
await current_task.acquire()
return r
return v

async def wait(self):
self.maybe_start_pending_task()
Expand Down Expand Up @@ -507,8 +508,6 @@ def exit(self):
if self.opts.sync:
self.inst.may_not_enter_bc_sync_export = False
self.maybe_start_pending_task()
if not self.on_block:
current_task.release()

class Subtask(CallContext):
lenders: list[HandleElem]
Expand Down Expand Up @@ -1365,31 +1364,26 @@ async def call_and_trap_on_throw(callee, task, args):
### `canon lower`

async def canon_lower(opts, callee, ft, task, flat_args):
assert(current_task.locked())
trap_if(task.inst.may_not_leave)

flat_args = CoreValueIter(flat_args)
flat_results = None
if opts.sync:
subtask = Subtask(opts, task.inst)
task.inst.may_not_enter_bc_sync_import = True
def on_block():
if task.on_block:
task.on_block()
task.on_block = None
def on_start():
return lift_flat_values(subtask, MAX_FLAT_PARAMS, flat_args, ft.param_types())
def on_return(results):
nonlocal flat_results
flat_results = lower_flat_values(subtask, MAX_FLAT_RESULTS, results, ft.result_types(), flat_args)
await callee(task, on_block, on_start, on_return)
await callee(task, task.on_block, on_start, on_return)
task.inst.may_not_enter_bc_sync_import = False
subtask.finish()
else:
subtask = AsyncSubtask(opts, task.inst)
eager_result = asyncio.Future()
on_block = asyncio.Future()
async def do_call():
def on_block():
eager_result.set_result('block')
def on_start():
subtask.start()
return lift_flat_values(subtask, 1, flat_args, ft.param_types())
Expand All @@ -1398,16 +1392,19 @@ def on_return(results):
lower_flat_values(subtask, 0, results, ft.result_types(), flat_args)
await callee(task, on_block, on_start, on_return)
subtask.finish()
if not eager_result.done():
eager_result.set_result('complete')
if on_block.done():
current_task.release()
else:
on_block.set_result(OnBlockResult.COMPLETED)
asyncio.create_task(do_call())
match await eager_result:
case 'complete':
flat_results = [0]
case 'block':
match await on_block:
case OnBlockResult.BLOCKED:
i = task.add_async_subtask(subtask)
flat_results = [pack_async_result(i, subtask.state)]
case OnBlockResult.COMPLETED:
flat_results = [0]

assert(current_task.locked())
return flat_results

def pack_async_result(i, state):
Expand Down
Loading

0 comments on commit 5cb57ad

Please sign in to comment.