Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid inf recursion in BroadcastReceiver.receive() #343

Merged
merged 11 commits into from
Jan 30, 2023
Merged

Conversation

goodboy
Copy link
Owner

@goodboy goodboy commented Nov 15, 2022

Originally driven by a bug found in piker where we'd get an inf recursion error due to BroadcastReceiver.receive() being recursively called when consumer tasks are awoken but no value is ready to nowait receive.

  • This new rework takes an approach closer to the interface and internals of trio.MemoryReceiveChannel particularly in terms of,
    • implementing a BroadcastReceiver.receive_nowait() and using it within the async .receive()
    • failing over to an internal ._receive_from_underlying() when the _nowait() call raises trio.WouldBlock
    • adding BroadcastState.statistics() for debugging and testing
    • dropping recursion from .receive()
  • adds an internal BroadcastReceiver._raise_on_lag: bool which can be set to avoid Lagged errors from being raised in the case where a user knows they want/need a cheap or nasty pattern

MsgStream as only stream type!

  • this fully drops the separation between MsgStream and ReceiveMsgStream merging the two types into the former; the only thing differing was .send() which should really be a choice not to use since (at least eventually) the underlying impl will always be a bidir stream.
    • this allows us to also monkey patch BroadcastReciever delivered from MsgStream.subscribe() with a .send() which delegates to the underlying stream making it very simple to support multi-task usage of any fan-out stream 🏄🏼

TODO:

  • drop the ipython WIP commit (aac8d34) from this history
  • add the above details to the commit msg for e3a8e45
  • hopefully a (set of) test(s) to demonstrate the original recursion error? => don't think it's worth it, if it shows up again then yes..
    • originally was with > 2 consumers and a set of tasks are cancelled simultaneously?

@goodboy goodboy added bug Something isn't working api streaming cancellation SC teardown semantics and anti-zombie semantics trionics labels Nov 15, 2022
@goodboy goodboy requested a review from guilledk November 15, 2022 21:21
@goodboy goodboy changed the base branch from master to piker_pin November 15, 2022 21:21
@goodboy goodboy changed the title Breceiver internals rework Avoid inf recursion in BroadcastReceiver.receive() Nov 15, 2022
@goodboy goodboy force-pushed the breceiver_internals branch 3 times, most recently from 1de4cce to 38b6b00 Compare December 13, 2022 20:21
@goodboy goodboy force-pushed the breceiver_internals branch from 3d926d2 to 565057b Compare January 26, 2023 21:05
@goodboy goodboy changed the base branch from piker_pin to ipc_failure_while_streaming January 26, 2023 21:28
@goodboy goodboy force-pushed the breceiver_internals branch 4 times, most recently from 770de3b to e631d67 Compare January 29, 2023 01:07
@goodboy goodboy force-pushed the ipc_failure_while_streaming branch from 710dee0 to 13c9ead Compare January 29, 2023 19:55
@goodboy goodboy force-pushed the breceiver_internals branch from ed24111 to dd44d77 Compare January 29, 2023 20:02
Base automatically changed from ipc_failure_while_streaming to master January 29, 2023 20:02
Driven by a bug found in `piker` where we'd get an inf recursion error
due to `BroadcastReceiver.receive()` being called when consumer tasks
are awoken but no value is ready to `.nowait_receive()`.

This new rework takes an approach closer to the interface and internals
of `trio.MemoryReceiveChannel` particularly in terms of,

- implementing a `BroadcastReceiver.receive_nowait()` and using it
  within the async `.receive()`.
- failing over to an internal `._receive_from_underlying()` when the
  `_nowait()` call raises `trio.WouldBlock`.
- adding `BroadcastState.statistics()` for debugging and testing
  dropping recursion from `.receive()`.
Since one-way streaming can be accomplished by just *not* sending on one
side (and/or thus wrapping such usage in a more restrictive API), we
just drop the recv-only parent type. The only method different was
`MsgStream.send()`, now merged in. Further in usage of `.subscribe()`
we monkey patch the underlying stream's `.send()` onto the delivered
broadcast receiver so that subscriber tasks can two-way stream as though
using the stream directly.

This allows us to more definitively drop `tractor.open_stream_from()` in
the longer run if we so choose as well; note currently this will
potentially create an issue if a caller tries to `.send()` on such a one
way stream.
Makes the broadcast test suite not hang xD, and is our expected default
behaviour. Also removes a ton of commented legacy cruft from before the
refactor to remove the `.receive()` recursion and fixes some typing.

Oh right, and in the case where there's only one subscriber left we warn
log about it since in theory we could actually entirely unwind the
bcaster back to the original underlying, though not sure if that's sane
or works for some use cases (like wanting to have some other subscriber
get added dynamically later).
@goodboy goodboy force-pushed the breceiver_internals branch from dd44d77 to 4ce2dcd Compare January 29, 2023 20:04
@goodboy goodboy merged commit 649c5e7 into master Jan 30, 2023
@goodboy goodboy deleted the breceiver_internals branch January 30, 2023 19:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api bug Something isn't working cancellation SC teardown semantics and anti-zombie semantics streaming trionics
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants