diff --git a/distributed/batched.py b/distributed/batched.py index aa001322b2..9050516783 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -1,5 +1,6 @@ from __future__ import annotations +import contextlib import logging from collections import deque from typing import Any @@ -94,9 +95,24 @@ def _background_send(self): self.batch_count += 1 self.next_deadline = time() + self.interval try: - nbytes = yield self.comm.write( - payload, serializers=self.serializers, on_error="raise" - ) + # NOTE: Since `BatchedSend` doesn't have a handle on the running + # `_background_send` coroutine, the only thing with a reference to this + # coroutine is the event loop itself. If the event loop stops while + # we're waiting on a `write`, the `_background_send` coroutine object + # may be garbage collected. If that happens, the `yield coro` will raise + # `GeneratorExit`. But because this is an old-school `gen.coroutine`, + # and we're using `yield` and not `await`, the `write` coroutine object + # will not actually have been awaited, and it will remain sitting around + # for someone to retrieve it. At interpreter exit, this will warn + # sommething like `RuntimeWarning: coroutine 'TCP.write' was never + # awaited`. By using the `closing` contextmanager, the `write` coroutine + # object is always cleaned up, even if `yield` raises `GeneratorExit`. + with contextlib.closing( + self.comm.write( + payload, serializers=self.serializers, on_error="raise" + ) + ) as coro: + nbytes = yield coro if nbytes < 1e6: self.recent_message_log.append(payload) else: @@ -160,9 +176,13 @@ def close(self, timeout=None): try: if self.buffer: self.buffer, payload = [], self.buffer - yield self.comm.write( - payload, serializers=self.serializers, on_error="raise" - ) + # See note in `_background_send` for explanation of `closing`. + with contextlib.closing( + self.comm.write( + payload, serializers=self.serializers, on_error="raise" + ) + ) as coro: + yield coro except CommClosedError: pass yield self.comm.close()