Skip to content

Commit

Permalink
Merge pull request #343 from goodboy/breceiver_internals
Browse files Browse the repository at this point in the history
Avoid inf recursion in `BroadcastReceiver.receive()`
  • Loading branch information
goodboy authored Jan 30, 2023
2 parents a777217 + 203f956 commit 649c5e7
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 119 deletions.
19 changes: 19 additions & 0 deletions nooz/343.trivial.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Rework our ``.trionics.BroadcastReceiver`` internals to avoid method
recursion and approach a design and interface closer to ``trio``'s
``MemoryReceiveChannel``.

The details of the internal changes include:

- implementing a ``BroadcastReceiver.receive_nowait()`` and using it
within the async ``.receive()`` thus avoiding recursion from
``.receive()``.
- failing over to an internal ``._receive_from_underlying()`` when the
``_nowait()`` call raises ``trio.WouldBlock``
- adding ``BroadcastState.statistics()`` for debugging and testing both
internals and by users.
- add an internal ``BroadcastReceiver._raise_on_lag: bool`` which can be
set to avoid ``Lagged`` raising for possible use cases where a user
wants to choose between a [cheap or nasty
pattern](https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern)
the the particular stream (we use this in ``piker``'s dark clearing
engine to avoid fast feeds breaking during HFT periods).
2 changes: 1 addition & 1 deletion tests/test_advanced_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def is_win():
return platform.system() == 'Windows'


_registry: dict[str, set[tractor.ReceiveMsgStream]] = {
_registry: dict[str, set[tractor.MsgStream]] = {
'even': set(),
'odd': set(),
}
Expand Down
58 changes: 55 additions & 3 deletions tests/test_task_broadcasting.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
import trio
from trio.lowlevel import current_task
import tractor
from tractor.trionics import broadcast_receiver, Lagged
from tractor.trionics import (
broadcast_receiver,
Lagged,
)


@tractor.context
Expand All @@ -37,7 +40,7 @@ async def echo_sequences(

async def ensure_sequence(

stream: tractor.ReceiveMsgStream,
stream: tractor.MsgStream,
sequence: list,
delay: Optional[float] = None,

Expand Down Expand Up @@ -211,7 +214,8 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
arb_addr,
start_method,
):
'''Ensure that if a faster task consuming from a stream is cancelled
'''
Ensure that if a faster task consuming from a stream is cancelled
the slower task can continue to receive all expected values.
'''
Expand Down Expand Up @@ -460,3 +464,51 @@ async def cancel_and_send():
assert value == 1

trio.run(main)


def test_no_raise_on_lag():
'''
Run a simple 2-task broadcast where one task is slow but configured
so that it does not raise `Lagged` on overruns using
`raise_on_lasg=False` and verify that the task does not raise.
'''
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)

async def slow():
async with brx.subscribe(
raise_on_lag=False,
) as br:
async for msg in br:
print(f'slow task got: {msg}')
await trio.sleep(0.1)

async def fast():
async with brx.subscribe() as br:
async for msg in br:
print(f'fast task got: {msg}')

async def main():
async with (
tractor.open_root_actor(
# NOTE: so we see the warning msg emitted by the bcaster
# internals when the no raise flag is set.
loglevel='warning',
),
trio.open_nursery() as n,
):
n.start_soon(slow)
n.start_soon(fast)

for i in range(1000):
await tx.send(i)

# simulate user nailing ctl-c after realizing
# there's a lag in the slow task.
await trio.sleep(1)
raise KeyboardInterrupt

with pytest.raises(KeyboardInterrupt):
trio.run(main)
2 changes: 0 additions & 2 deletions tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from ._ipc import Channel
from ._streaming import (
Context,
ReceiveMsgStream,
MsgStream,
stream,
context,
Expand Down Expand Up @@ -64,7 +63,6 @@
'MsgStream',
'BaseExceptionGroup',
'Portal',
'ReceiveMsgStream',
'RemoteActorError',
'breakpoint',
'context',
Expand Down
11 changes: 7 additions & 4 deletions tractor/_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@
NoResult,
ContextCancelled,
)
from ._streaming import Context, ReceiveMsgStream
from ._streaming import (
Context,
MsgStream,
)


log = get_logger(__name__)
Expand Down Expand Up @@ -101,7 +104,7 @@ def __init__(self, channel: Channel) -> None:
# it is expected that ``result()`` will be awaited at some
# point.
self._expect_result: Optional[Context] = None
self._streams: set[ReceiveMsgStream] = set()
self._streams: set[MsgStream] = set()
self.actor = current_actor()

async def _submit_for_result(
Expand Down Expand Up @@ -316,7 +319,7 @@ async def open_stream_from(
async_gen_func: Callable, # typing: ignore
**kwargs,

) -> AsyncGenerator[ReceiveMsgStream, None]:
) -> AsyncGenerator[MsgStream, None]:

if not inspect.isasyncgenfunction(async_gen_func):
if not (
Expand All @@ -341,7 +344,7 @@ async def open_stream_from(

try:
# deliver receive only stream
async with ReceiveMsgStream(
async with MsgStream(
ctx, ctx._recv_chan,
) as rchan:
self._streams.add(rchan)
Expand Down
23 changes: 12 additions & 11 deletions tractor/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@
# - use __slots__ on ``Context``?


class ReceiveMsgStream(trio.abc.ReceiveChannel):
class MsgStream(trio.abc.Channel):
'''
A IPC message stream for receiving logically sequenced values over
an inter-actor ``Channel``. This is the type returned to a local
task which entered either ``Portal.open_stream_from()`` or
``Context.open_stream()``.
A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC ``Channel``.
This is the type returned to a local task which entered either
``Portal.open_stream_from()`` or ``Context.open_stream()``.
Termination rules:
Expand Down Expand Up @@ -317,15 +318,15 @@ async def subscribe(
async with self._broadcaster.subscribe() as bstream:
assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv
yield bstream

# NOTE: we patch on a `.send()` to the bcaster so that the
# caller can still conduct 2-way streaming using this
# ``bstream`` handle transparently as though it was the msg
# stream instance.
bstream.send = self.send # type: ignore

class MsgStream(ReceiveMsgStream, trio.abc.Channel):
'''
Bidirectional message stream for use within an inter-actor actor
``Context```.
yield bstream

'''
async def send(
self,
data: Any
Expand Down
Loading

0 comments on commit 649c5e7

Please sign in to comment.