From 7b7024f6ec0ca5d3390064256b2b10ade253efa7 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 8 Jun 2022 17:31:05 +0100 Subject: [PATCH] Worker State Machine breaks on fetch->missing + _ensure_communicating --- .../tests/test_worker_state_machine.py | 82 +++++++++++++++++++ distributed/worker.py | 18 +++- 2 files changed, 98 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 324d9b47999..5d3faa28da0 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -23,6 +23,7 @@ ExecuteSuccessEvent, Instruction, RecommendationsConflict, + RefreshWhoHasEvent, ReleaseWorkerDataMsg, RescheduleEvent, RescheduleMsg, @@ -603,3 +604,84 @@ async def test_missing_to_waiting(c, s, w1, w2, w3): await w1.close() await f1 + + +@gen_cluster(client=True, nthreads=[("", 1)] * 3) +async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3): + """ + 1. Two tasks, x and y, are only available on a busy worker. + The worker sends request-refresh-who-has to the scheduler. + 2. The scheduler responds that x has become missing, while y has gained an + additional replica + 3. The handler for RefreshWhoHasEvent empties x.who_has and recommends a transition + to missing. + 5. Before the recommendation can be implemented, the same event invokes + _ensure_communicating to allow y to transition to flight. This in turn pops x + data_needed - but x has an empty who_has, which is an exceptional situation. + 6. The transition fetch->missing is executed, but x is no longer in + data_needed - another exceptional situation. + """ + x = c.submit(inc, 1, key="x", workers=[w1.address]) + y = c.submit(inc, 2, key="y", workers=[w1.address]) + await wait([x, y]) + w1.total_in_connections = 0 + s.request_acquire_replicas(w3.address, ["x", "y"], stimulus_id="test1") + + # The tasks will now flip-flop between fetch and flight every 150ms + # (see Worker.retry_busy_worker_later) + await wait_for_state("x", "fetch", w3) + await wait_for_state("y", "fetch", w3) + assert w1.address in w3.busy_workers + # w3 sent {op: request-refresh-who-has, keys: [x, y]} + # There also may have been enough time for a refresh-who-has message to come back, + # which reiterated what the w3 already knew: + # {op: refresh-who-has, who_has={x: [w1.address], y: [w1.address]}} + + # Let's instead simulate that, while request-refresh-who-has was in transit, + # w2 gained a replica of y and then subsequently w1 closed down. + # When request-refresh-who-has lands, the scheduler will respond: + # {op: refresh-who-has, who_has={x: [], y: [w2.address]}} + w3.handle_stimulus( + RefreshWhoHasEvent(who_has={"x": {}, "y": {w2.address}}, stimulus_id="test3") + ) + assert w3.tasks["x"].state == "missing" + assert w3.tasks["y"].state == "flight" + assert w3.tasks["y"].who_has == {w2.address} + + +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_fetch_to_missing_on_network_failure(c, s, a): + """ + 1. Multiple tasks can be fetched from the same worker + 2. Only some transition to flight, while the others remain in fetch state, e.g. + because of excessive size + 3. gather_dep returns GatherDepNetworkFailureEvent + 4. The event empties has_what. This impacts both the tasks in fetch as well as those + in flight. The event recommends a transition to missing for all tasks with empty + who_has. + 5. Before the recommendation can be implemented, the same event invokes + _ensure_communicating, which pops the tasks in fetch from data_needed - but they + have an empty who_has, which is an exceptional situation. + 7. The transition fetch->missing is executed, but the tasks are no longer in + data_needed - another exceptional situation. + """ + block_get_data = asyncio.Event() + + class BlockedBreakingWorker(Worker): + async def get_data(self, comm, *args, **kwargs): + await block_get_data.wait() + raise OSError("fake error") + + async with BlockedBreakingWorker(s.address) as b: + x = c.submit(inc, 1, key="x", workers=[b.address]) + y = c.submit(inc, 2, key="y", workers=[b.address]) + await wait([x, y]) + s.request_acquire_replicas(a.address, ["x"], stimulus_id="test_x") + await wait_for_state("x", "flight", a) + s.request_acquire_replicas(a.address, ["y"], stimulus_id="test_y") + await wait_for_state("y", "fetch", a) + + block_get_data.set() + + await wait_for_state("x", "missing", a) + await wait_for_state("y", "missing", a) diff --git a/distributed/worker.py b/distributed/worker.py index 372158cb201..cb6a804f869 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2168,7 +2168,14 @@ def transition_fetch_flight( def transition_fetch_missing( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: - self.data_needed.remove(ts) + # There's a use case where ts won't be found in self.data_needed, so + # `self.data_needed.remove(ts)` would crash: + # 1. An event handler empties who_has and pushes a recommendation to missing + # 2. The same event handler calls _ensure_communicating, which pops the task + # from data_needed + # 3. The recommendation is enacted + # See matching code in _ensure_communicating. + self.data_needed.discard(ts) return self.transition_generic_missing(ts, stimulus_id=stimulus_id) def transition_memory_released( @@ -2990,9 +2997,16 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: if self.validate: assert ts.state == "fetch" - assert ts.who_has assert self.address not in ts.who_has + if not ts.who_has: + # An event handler just emptied who_has and recommended a fetch->missing + # transition. Then, the same handler called _ensure_communicating. The + # transition hasn't been enacted yet, so the task is still in fetch + # state and in data_needed. + # See matching code in transition_fetch_missing. + continue + workers = [ w for w in ts.who_has