Skip to content

Commit

Permalink
Always close BatchedSend write coroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 committed Aug 10, 2022
1 parent e38d3a9 commit 9d302d7
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions distributed/batched.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import contextlib
import logging
from collections import deque
from typing import Any
Expand Down Expand Up @@ -94,9 +95,24 @@ def _background_send(self):
self.batch_count += 1
self.next_deadline = time() + self.interval
try:
nbytes = yield self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
# NOTE: Since `BatchedSend` doesn't have a handle on the running
# `_background_send` coroutine, the only thing with a reference to this
# coroutine is the event loop itself. If the event loop stops while
# we're waiting on a `write`, the `_background_send` coroutine object
# may be garbage collected. If that happens, the `yield coro` will raise
# `GeneratorExit`. But because this is an old-school `gen.coroutine`,
# and we're using `yield` and not `await`, the `write` coroutine object
# will not actually have been awaited, and it will remain sitting around
# for someone to retrieve it. At interpreter exit, this will warn
# sommething like `RuntimeWarning: coroutine 'TCP.write' was never
# awaited`. By using the `closing` contextmanager, the `write` coroutine
# object is always cleaned up, even if `yield` raises `GeneratorExit`.
with contextlib.closing(
self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
) as coro:
nbytes = yield coro
if nbytes < 1e6:
self.recent_message_log.append(payload)
else:
Expand Down Expand Up @@ -160,9 +176,13 @@ def close(self, timeout=None):
try:
if self.buffer:
self.buffer, payload = [], self.buffer
yield self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
# See note in `_background_send` for explanation of `closing`.
with contextlib.closing(
self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
) as coro:
yield coro
except CommClosedError:
pass
yield self.comm.close()
Expand Down

0 comments on commit 9d302d7

Please sign in to comment.