Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not drop BatchedSend payload if worker reconnects #5457

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, interval, loop=None, serializers=None):

def start(self, comm):
self.comm = comm
self.please_stop = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.please_stop = False
self.please_stop = False
self.stopped.clear()
self.waker.clear()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean by a "clear interface" is that I'd like a docstring for start saying the contract for what it can/can't do. For example, that start on a closed BatchedSend will restart it with a new comm. And that start on a currently-running BatchedSend raises an error, etc. What are the invariants for this internal state? How do all the methods ensure they are upheld?

self.loop.add_callback(self._background_send)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One downside of using Tornado here is that we have no handle on this coroutine. If we did asyncio.create_task or similar, we could keep a Task object here.

  1. That might let us simplify the please_stop/stopped/abort logic by just letting us do task.cancel() and handling the CancelledError within _background_send. We would probably want to shield the comm.write call from cancellation if we did so.
  2. We could assert in send and elsewhere that the _background_send Task is not done(). If we're also checking please_stop, etc. this may be overkill, but it's a nice sanity check that something hasn't gone horribly wrong. The fact that the coroutine wasn't running was the cause of Properly support restarting BatchedSend #5481; a check for this would have made the bug much easier to find.


def closed(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please_stop and stopped should probably factor into this as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this check is insufficient for telling whether we're closed or not. While we're in the process of closing (please_stop is True, or self.stopped is set, or _background_send is no longer running, etc.), it may still return True.

From the way this is used though, in both start and write, we probably want to treat "in the process of closing" as closed, not as running. Restarting a BatchedSend that's closing should be an error. If writing to a closed BatchedSend is an error, then so should be writing to one that's in the process of closing.

Expand Down Expand Up @@ -98,6 +99,7 @@ def _background_send(self):
else:
self.recent_message_log.append("large-message")
self.byte_count += nbytes
payload = [] # lose ref
except CommClosedError:
logger.info("Batched Comm Closed %r", self.comm, exc_info=True)
break
Expand All @@ -111,7 +113,9 @@ def _background_send(self):
logger.exception("Error in batched write")
break
finally:
payload = None # lose ref
# If anything failed we should not loose payload. If a new comm
fjetter marked this conversation as resolved.
Show resolved Hide resolved
# is provided we can still resubmit messages
self.buffer = payload + self.buffer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we should retain payload? Couldn't those messages have been successfully sent, and the error occurred after? Then we might be duplicating them when we reconnect.

Though maybe we'd rather duplicate messages than drop them. In that case, let's add a note saying so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I am assuming a certain behaviour of the comm. Either the comm writes all or nothing. That's likely not always true but I believe we cannot do much about it on this level of abstraction. imho, that guarantee should be implemented by our protocol and/or Comm interface.

Either way, I'm happy if you have any suggestions to improve this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading through TCP comm code, I don't think there's anywhere for an exception to happen after all of the payload has been sent. An exception could still happen when part has been sent and part hasn't, but either way, since we have to assume something here, I think it's more useful to assume that the payload hasn't been sent.

@jcrist and I were discussing this, and given the way the BatchedSend interface works, it actually needs to implement some sort of protocol with an ack from the receiver for each sent batch to guarantee messages can't be dropped. Since send on a BatchedSend is nonblocking, the caller is basically handing off full responsibility for the message to BatchedSend. If the message fails to send, it's too late to raise an error and let the caller figure out what to do about it—once we've been given a message, we have to ensure it's delivered. So the logical thing to do would be for the receiving side to ack each message, and only when ack'd does the sender drop the payload (with some deduplication of course).

OTOH there are lots of protocols out there for doing things like this, more performantly, robustly, and with better testing than we'll ever have. As with other things (framing in serialization), maybe the better solution is to stop duplicating functionality at the application level that should be the transport layer's job.

Could we get rid of BatchedSend entirely with some well-tuned TCP buffering settings + a serialization scheme that was more efficient for many small messages?

else:
# nobreak. We've been gracefully closed.
self.stopped.set()
Expand All @@ -121,7 +125,6 @@ def _background_send(self):
# there was an exception when using `comm`.
# We can't close gracefully via `.close()` since we can't send messages.
# So we just abort.
# This means that any messages in our buffer our lost.
# To propagate exceptions, we rely on subsequent `BatchedSend.send`
# calls to raise CommClosedErrors.
self.stopped.set()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't comment there, but this is a comment for send:

        if self.comm is not None and self.comm.closed():
            raise CommClosedError(f"Comm {self.comm!r} already closed.")

This check feels insufficient. A more thorough check would have caught #5481. Should we also check not self.closed()? not self.please_stop? That the _background_send coroutine is still running? Things like that? It seems inconsistent to prohibit sends when the underlying comm is closed, but still allow them if the BatchedSend itself is effectively closed.

On the other hand, we may be relying on the fact that messages can be enqueued to a BatchedSend even when it's closed. If we expect the BatchedSend to be restarted and reused soon, perhaps there should be no restrictions on when send can be called.

I'd like to:

  1. Figure out exactly what the behavior is we actually want
  2. Write a docstring explaining the contract for when send can and cannot be called, how it behaves in the case of an open vs a closed underlying comm, an open vs closed BatchedSend, etc.
  3. Make the code match this contract
  4. Test that it does so

Expand Down Expand Up @@ -152,6 +155,7 @@ def close(self, timeout=None):
self.please_stop = True
self.waker.set()
yield self.stopped.wait(timeout=timeout)
payload = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could move this below the if not self.comm.closed():

if not self.comm.closed():
try:
if self.buffer:
Expand All @@ -160,14 +164,15 @@ def close(self, timeout=None):
payload, serializers=self.serializers, on_error="raise"
)
except CommClosedError:
pass
# If we're closing and there is an error there is little we
# can do about this to recover.
logger.error("Lost %i payload messages.", len(payload))
yield self.comm.close()

def abort(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should abort be part of the public interface? When/how should callers use abort vs close? What can and cannot be done with a BatchedSend after abort (or close) has been called? There are all things I'd like to see documented.

if self.comm is None:
return
self.please_stop = True
self.buffer = []
self.waker.set()
if not self.comm.closed():
self.comm.abort()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment for both here and close(): why should the BatchedSend close/abort the comm when it's closed? Does the caller expect that, when calling start, it's handed off lifecycle responsibilities for the comm to BatchedSend? If so, that should be documented ("once you call start, you must not directly use the comm object ").

22 changes: 22 additions & 0 deletions distributed/tests/test_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,25 @@ async def test_serializers():
assert "function" in value

assert comm.closed()


@pytest.mark.asyncio
async def test_retain_buffer_commclosed():
async with EchoServer() as e:
with captured_logger("distributed.batched") as caplog:
comm = await connect(e.address)

b = BatchedSend(interval="1s", serializers=["msgpack"])
b.start(comm)
b.send("foo")
assert b.buffer
await comm.close()
await asyncio.sleep(1)

assert "Batched Comm Closed" in caplog.getvalue()
assert b.buffer

new_comm = await connect(e.address)
b.start(new_comm)
assert await new_comm.read() == ("foo",)
assert not b.buffer
43 changes: 38 additions & 5 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2568,11 +2568,9 @@ def fast_on_a(lock):

assert "Unexpected worker completed task" in s_logs.getvalue()

# Ensure that all in-memory tasks on A have been restored on the
# scheduler after reconnect
for ts in a.tasks.values():
if ts.state == "memory":
assert a.address in {ws.address for ws in s.tasks[ts.key].who_has}
sts = s.tasks[f3.key]
assert sts.state == "memory"
assert s.workers[a.address] in sts.who_has

del f1, f2, f3
while any(w.tasks for w in [a, b]):
Expand Down Expand Up @@ -3196,3 +3194,38 @@ async def test_deadlock_cancelled_after_inflight_before_gather_from_worker(
args, kwargs = mocked_gather.call_args
await Worker.gather_dep(b, *args, **kwargs)
await fut3


@gen_cluster(nthreads=[("", 1)])
async def test_dont_loose_payload_reconnect(s, w):
fjetter marked this conversation as resolved.
Show resolved Hide resolved
"""Ensure that payload of a BatchedSend is not lost if a worker reconnects"""
s.count = 0

def receive(worker, msg):
s.count += 1

s.stream_handlers["receive-msg"] = receive
w.batched_stream.next_deadline = w.loop.time() + 10_000

for x in range(100):
w.batched_stream.send({"op": "receive-msg", "msg": x})

await s.stream_comms[w.address].comm.close()
while not w.batched_stream.comm.closed():
await asyncio.sleep(0.1)
before = w.batched_stream.buffer.copy()
w.batched_stream.next_deadline = w.loop.time()
assert len(w.batched_stream.buffer) == 100
with captured_logger("distributed.batched") as caplog:
await w.batched_stream._background_send()

assert "Batched Comm Closed" in caplog.getvalue()
after = w.batched_stream.buffer.copy()

# Payload that couldn't be submitted is prepended
assert len(after) >= len(before)
assert after[: len(before)] == before
Comment on lines +3261 to +3263
Copy link
Collaborator

@crusaderky crusaderky Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If everything went through before the close, then you're not testing anything useful.
From what I can read here it might as well be what happens every time.
The opposite might also be true - everything goes through AFTER the close.

Could you change to

assert before)
assert len(after) > len(before)

while possibly drastically increasing the number of messages in the pipeline to avoid flakiness?

Copy link
Member Author

@fjetter fjetter Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. assert before is already asserted but as assert len(w.batched_stream.buffer) == 100 in L3218
  2. Increasing the number of messages sent should not affect anything since the batched send is implemented as all or nothing
  3. Actually the most probably scenario is after == before since you only get a difference if something in the scheduler sent another message while the background send is awaiting the write to the closed comm which is an incredibly tiny time window. At first I had the equal assert only but that caused this test to fail every now and then. I added the larger equal to avoid flakiness. Equal is fine. after > before is fine iff the first len(before) messages are the same as before.

If the payload would've been submitted before closing, the buffer would be empty before I am trying to send.

If we want to absolutely have > the only way I see is to schedule futures trying to send smth simultaneously to the await background_send in L 3220 but that would render our buffer assert impossible since we no longer can say for sure what the state before actually was.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explicitly call batched_stream.send(..) after the comm has been closed, and test that it has the correct behavior? (As I mentioned in another comment I don't know what this behavior should be; raise an error or just add to the buffer?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's been done as part of test_send_before_close. I didn't change the behaviour but extended the test a bit. The behaviour is as follows

  1. If a send is successful, the message is appended to the buffer. The BatchedSend is now responsible for delivery. The responsibility of the server is to check periodically and restart by providing a new comm if necessary.
  2. If an exception is raised, the code should raise immediately and not append to the buffer. Whatever logic was trying to send a message can then decide how the failure should be handled.

To me, that's sensible behaviour


await w.heartbeat()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is to make the worker reconnect to the scheduler. Is there a way we could wait for that to happen naturally, to test that it does so properly? Or do we have other good tests around worker reconnection that we're confident in?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the heartbeat triggers a reconnect. There are
test_worker_reconnects_mid_compute
and
test_worker_reconnects_mid_compute_multiple_states_on_scheduler
which test that the worker reconnects and corrects the scheduler state accordingly if any tasks are on the cluster. I consider testing the reconnect itself out of scope for this.

while not s.count == 100:
await asyncio.sleep(0.1)
1 change: 1 addition & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,7 @@ async def handle_scheduler(self, comm):
comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
)
except Exception as e:
self.batched_stream.please_stop = True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we not mutate internal state of BatchedSend? This feels brittle. I see this is going to do something slightly different from batched_stream.close(). If we need the ability to do both things, let's have a method for doing whatever this is as well (request_stop?).

Though I'd prefer having just one close() function. If the behavior of the current close isn't useful to calling code, perhaps we should change its behavior to something that is useful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was actually not even necessary. The BatchedSend infers automatically that the Comm is closed and there is no need to close anything

logger.exception(e)
raise
finally:
Expand Down