Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always close BatchedSend write coroutines #6865

Merged
merged 1 commit into from
Aug 12, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions distributed/batched.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import contextlib
import logging
from collections import deque
from typing import Any
Expand Down Expand Up @@ -94,9 +95,24 @@ def _background_send(self):
self.batch_count += 1
self.next_deadline = time() + self.interval
try:
nbytes = yield self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
# NOTE: Since `BatchedSend` doesn't have a handle on the running
# `_background_send` coroutine, the only thing with a reference to this
# coroutine is the event loop itself. If the event loop stops while
Copy link
Member

@graingert graingert Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is correct, but it's missing an edge case, the event loop doesn't even keep a reference to the coro, necessarily.

it's loop._ready, loop._scheduled, loop._selector -> Handle -> Future.callback(self) -> Future._callbacks -> Task.__step(self), -> Task._coro

Highly recommend building a fun picture in objgraph of this ^

and when you call loop.close() it wipes out all the Handles in ready, scheduled and selector
however you can also create a Future wait on it in a Task and then lose the Future: boom the coro goes with it. running the finally block on whichever poor thread upset the garbage collector.

I don't think you should change the comment, but I think it's worth having this idea written down somewhere ^

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually on second thought maybe it is worth a change here, because maybe it explains why you can't fix this with yield asyncio.create_task(self.comm.write(...)). Or maybe It just depends on if the hammer falls before or after asyncio.get_running_loop() starts failing

# we're waiting on a `write`, the `_background_send` coroutine object
# may be garbage collected. If that happens, the `yield coro` will raise
# `GeneratorExit`. But because this is an old-school `gen.coroutine`,
# and we're using `yield` and not `await`, the `write` coroutine object
# will not actually have been awaited, and it will remain sitting around
# for someone to retrieve it. At interpreter exit, this will warn
# sommething like `RuntimeWarning: coroutine 'TCP.write' was never
# awaited`. By using the `closing` contextmanager, the `write` coroutine
# object is always cleaned up, even if `yield` raises `GeneratorExit`.
with contextlib.closing(
self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
) as coro:
nbytes = yield coro
if nbytes < 1e6:
self.recent_message_log.append(payload)
else:
Expand Down Expand Up @@ -160,9 +176,13 @@ def close(self, timeout=None):
try:
if self.buffer:
self.buffer, payload = [], self.buffer
yield self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
# See note in `_background_send` for explanation of `closing`.
with contextlib.closing(
self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
) as coro:
yield coro
except CommClosedError:
pass
yield self.comm.close()
Expand Down