Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ipc failure while streaming #346

Merged
merged 23 commits into from
Jan 29, 2023
Merged

Ipc failure while streaming #346

merged 23 commits into from
Jan 29, 2023

Conversation

goodboy
Copy link
Owner

@goodboy goodboy commented Dec 12, 2022

Variety of streaming related teardown fixes for when IPC goes down before streams are terminated gracefully. Mostly discovered during dev of initial draft of pikers/piker#420.


TODO:

@goodboy goodboy force-pushed the ipc_failure_while_streaming branch from c9f2d96 to 2aed7dd Compare December 12, 2022 20:17
@goodboy
Copy link
Owner Author

goodboy commented Dec 12, 2022

Lol.. ok so its 5dc07f0..

No idea why yet, but dropping that commit for this history and gonna rebase all dependents.

@goodboy
Copy link
Owner Author

goodboy commented Dec 12, 2022

Lul, and turns out ba6a2b1 causes the test_context_streaming_semtantics suite to hang 😂

So gonna drop that one for now as well..

@goodboy goodboy force-pushed the ipc_failure_while_streaming branch from f7cc993 to 6d34893 Compare December 12, 2022 23:59
@goodboy goodboy force-pushed the ipc_failure_while_streaming branch from 6895172 to b722238 Compare January 26, 2023 21:05
@@ -319,7 +319,7 @@ async def _invoke(
BrokenPipeError,
):
# if we can't propagate the error that's a big boo boo
log.error(
log.exception(
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important to show what traceback wasn't able to be propagated to the far end actor's caller task.

@@ -826,7 +829,12 @@ async def _push_result(

if ctx._backpressure:
log.warning(text)
await send_chan.send(msg)
try:
await send_chan.send(msg)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using backpressure on a stream we need to be sure we don't crash when the local feed memchan has been broken.

Need a test for this, though I can't remember if this was the original issue that caused the ipc_failure_during_stream.py example to inf hang before?

@@ -609,7 +610,14 @@ async def open_stream(
# XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
await self.send_stop()
try:
await self.send_stop()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need a test for this:

  • one end of a stream is closed
  • the other end trying to close before any other stream methods .receive()/.send() are called (thus detecting the closure before trying to send a stop msg)

finally:
# NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I can remember the case that caused this unfortunately 😂 though pretty sure it was in piker.

Unfortunately this may just go in without a false positive case ..

@goodboy goodboy mentioned this pull request Jan 28, 2023
@goodboy goodboy added bug Something isn't working IPC and transport streaming supervision cancellation SC teardown semantics and anti-zombie semantics examples labels Jan 28, 2023
@goodboy goodboy requested a review from guilledk January 28, 2023 23:05
When backpressure is used and a feeder mem chan breaks during msg
delivery (usually because the IPC allocating task already terminated)
instead of raising we simply warn as we do for the non-backpressure
case.

Also, add a proper `Actor.is_arbiter` test inside `._invoke()` to avoid
doing an arbiter-registry lookup if the current actor **is** the
registrar.
Use a task nursery in the subactor to spawn tasks which cancel the IPC
channel mid stream to simulate the most concurrent case we're likely to
see. Make `main()` accept a `debug_mode: bool` for parametrization. Fill
out detailed comments/docs on this example.
With the new fancy `_pytest.pathlib.import_path()` we can do real
parametrization of the example-script-module code and thus configure
whether the child, parent, or both silently break the IPC connection.

Parametrize the test for all the above mentioned cases as well as the
case where the IPC never breaks but we still simulate the user hammering
ctl-c / SIGINT to terminate the actor tree. Adjust expected errors based
on each case and heavily document each of these.
We weren't doing this originally I *think* just because of the path
dependent nature of the way the code was developed (originally being
mega pedantic about one-way vs. bidirectional streams) but, it doesn't
seem like there's any issue just calling the stream's `.aclose()`; also
have the benefit of just being less code and logic checks B)
@goodboy goodboy force-pushed the ipc_failure_while_streaming branch from 710dee0 to 13c9ead Compare January 29, 2023 19:55
@goodboy goodboy merged commit a777217 into master Jan 29, 2023
@goodboy goodboy deleted the ipc_failure_while_streaming branch January 29, 2023 20:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working cancellation SC teardown semantics and anti-zombie semantics examples IPC and transport streaming supervision
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants