From 069c283bc7060602ce71ea0686e5d93abb03f429 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 1 Mar 2024 10:45:44 +0100 Subject: [PATCH 01/23] Fix --- distributed/scheduler.py | 30 ++++++++++---- distributed/tests/test_scheduler.py | 61 +++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 8 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 320edc391a4..6794670cc9f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1964,6 +1964,7 @@ def _transition( ) v = a_recs.get(key, finish) + # The inner rec has higher priority? Why is that? func = self._TRANSITIONS_TABLE["released", v] b_recs, b_cmsgs, b_wmsgs = func(self, key, stimulus_id) @@ -2082,7 +2083,12 @@ def _transition_released_waiting(self, key: Key, stimulus_id: str) -> RecsMsgs: assert not ts.who_has assert not ts.processing_on for dts in ts.dependencies: - assert dts.state not in {"forgotten", "erred"} + assert dts.state not in {"forgotten", "erred"}, ( + key, + dts.key, + dts.state, + self.transition_log, + ) if ts.has_lost_dependencies: return {key: "forgotten"}, {}, {} @@ -2480,10 +2486,13 @@ def _transition_memory_released( recommendations[key] = "forgotten" elif ts.has_lost_dependencies: recommendations[key] = "forgotten" - elif ts.who_wants or ts.waiters: + elif (ts.who_wants or ts.waiters) and not any( + dts.state in ("erred",) for dts in ts.dependencies + ): recommendations[key] = "waiting" for dts in ts.waiters or (): + # Why would a waiter be in processing? if dts.state in ("no-worker", "processing"): recommendations[dts.key] = "waiting" elif dts.state == "waiting": @@ -2505,14 +2514,14 @@ def _transition_released_erred(self, key: Key, stimulus_id: str) -> RecsMsgs: assert ts.exception_blame assert not ts.who_has assert not ts.waiting_on - assert not ts.waiters + # assert not ts.waiters failing_ts = ts.exception_blame assert failing_ts for dts in ts.dependents: - dts.exception_blame = failing_ts if not dts.who_has: + dts.exception_blame = failing_ts recommendations[dts.key] = "erred" report_msg = { @@ -2547,6 +2556,9 @@ def _transition_erred_released(self, key: Key, stimulus_id: str) -> RecsMsgs: for dts in ts.dependents: if dts.state == "erred": + # Does this make sense? + # This goes via released + # dts -> released -> waiting recommendations[dts.key] = "waiting" w_msg = { @@ -2621,8 +2633,8 @@ def _transition_processing_erred( self, key: Key, stimulus_id: str, + worker: str | None = None, *, - worker: str, cause: Key | None = None, exception: Serialized | None = None, traceback: Serialized | None = None, @@ -2675,7 +2687,8 @@ def _transition_processing_erred( if not ts.erred_on: ts.erred_on = set() - ts.erred_on.add(worker) + if worker: + ts.erred_on.add(worker) if exception is not None: ts.exception = exception ts.exception_text = exception_text @@ -2699,8 +2712,9 @@ def _transition_processing_erred( ) for dts in ts.dependents: - dts.exception_blame = failing_ts - recommendations[dts.key] = "erred" + if dts.who_has: + dts.exception_blame = failing_ts + recommendations[dts.key] = "erred" for dts in ts.dependencies: if dts.waiters: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 636efcd9e48..18fa3743a5e 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4890,3 +4890,64 @@ async def test_resubmit_different_task_same_key_warns_only_once( async with Worker(s.address): assert await c.gather(zs) == [2, 3, 4] # Kept old ys + + +def block(x, in_event, block_event): + in_event.set() + block_event.wait() + return x + + +@gen_cluster( + client=True, + nthreads=[("", 1, {"resources": {"a": 1}}), ("", 1, {"resources": {"b": 1}})], + config={"distributed.scheduler.allowed-failures": 1}, + Worker=Nanny, +) +async def test_fan_out_pattern_deadlock(c, s, a, b): + """Regression test for https://github.com/dask/distributed/issues/8548 + + Conceptually, this test simulates a fan-out based workload, where the worker + that processed the single input task of the fan-out dies once the fan-out task is processed + but before the scheduler recognizes that the single input task has successfully been sent to + another worker which now processes a fan-out task. Then, workers continue to die during processing + of the input tasks. + + This test heavily uses resources to force scheduling decisions. + """ + in_ancestor = Event() + block_ancestor = Event() + in_on_a_descendant = Event() + in_on_b_descendant = Event() + block_descendants = Event() + await block_ancestor.set() + f = c.submit(block, 1, in_ancestor, block_ancestor, key="f", resources={"a": 1}) + g = c.submit(inc, f, key="g", resources={"a": 1}) + h1 = c.submit( + block, g, in_on_a_descendant, block_descendants, key="h1", resources={"a": 1} + ) + h2 = c.submit( + block, g, in_on_b_descendant, block_descendants, key="h2", resources={"b": 1} + ) + + await asyncio.gather( + wait_for_state("g", "memory", s), + in_on_a_descendant.wait(), + in_on_b_descendant.wait(), + ) + await asyncio.gather(in_ancestor.clear(), block_ancestor.clear()) + await a.process.process.kill() + + await in_ancestor.wait() + await in_ancestor.clear() + await a.process.process.kill() + + await in_ancestor.wait() + await in_ancestor.clear() + await a.process.process.kill() + + await block_descendants.set() + # await block_ancestor.set() + await h2 + with pytest.raises(KilledWorker, match="Attempted to run task 'h1'"): + await h1 From 095ebdd17fd4798c9c1c915f5d73bbaf4329a840 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 1 Mar 2024 10:54:08 +0100 Subject: [PATCH 02/23] Minor --- distributed/scheduler.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6794670cc9f..b330321c4d7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1964,7 +1964,7 @@ def _transition( ) v = a_recs.get(key, finish) - # The inner rec has higher priority? Why is that? + # The inner rec has higher priority? Is that always desired? func = self._TRANSITIONS_TABLE["released", v] b_recs, b_cmsgs, b_wmsgs = func(self, key, stimulus_id) @@ -2084,9 +2084,8 @@ def _transition_released_waiting(self, key: Key, stimulus_id: str) -> RecsMsgs: assert not ts.processing_on for dts in ts.dependencies: assert dts.state not in {"forgotten", "erred"}, ( - key, - dts.key, - dts.state, + ts, + dts, self.transition_log, ) @@ -2492,7 +2491,6 @@ def _transition_memory_released( recommendations[key] = "waiting" for dts in ts.waiters or (): - # Why would a waiter be in processing? if dts.state in ("no-worker", "processing"): recommendations[dts.key] = "waiting" elif dts.state == "waiting": @@ -2514,7 +2512,6 @@ def _transition_released_erred(self, key: Key, stimulus_id: str) -> RecsMsgs: assert ts.exception_blame assert not ts.who_has assert not ts.waiting_on - # assert not ts.waiters failing_ts = ts.exception_blame assert failing_ts From de4caa9942c79ed93a3b6822565dd5847458a8bf Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 1 Mar 2024 13:30:41 +0100 Subject: [PATCH 03/23] Improved test and fix another problem --- distributed/scheduler.py | 13 +++++ distributed/tests/test_scheduler.py | 78 ++++++++++++++++++----------- 2 files changed, 62 insertions(+), 29 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b330321c4d7..da8705c7c07 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5049,6 +5049,19 @@ def stimulus_task_finished(self, key, worker, stimulus_id, run_id, **kwargs): "stimulus_id": stimulus_id, } ] + elif ts.state == "erred": + logger.debug( + "Received already erred task, worker: %s" ", key: %s", + worker, + key, + ) + worker_msgs[worker] = [ + { + "op": "free-keys", + "keys": [key], + "stimulus_id": stimulus_id, + } + ] elif ts.run_id != run_id: if not ts.processing_on or ts.processing_on.address != worker: logger.debug( diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 18fa3743a5e..8a709e03e6a 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4900,11 +4900,10 @@ def block(x, in_event, block_event): @gen_cluster( client=True, - nthreads=[("", 1, {"resources": {"a": 1}}), ("", 1, {"resources": {"b": 1}})], + nthreads=[("", 1, {"resources": {"a": 1}})], config={"distributed.scheduler.allowed-failures": 1}, - Worker=Nanny, ) -async def test_fan_out_pattern_deadlock(c, s, a, b): +async def test_fan_out_pattern_deadlock(c, s, a): """Regression test for https://github.com/dask/distributed/issues/8548 Conceptually, this test simulates a fan-out based workload, where the worker @@ -4920,34 +4919,55 @@ async def test_fan_out_pattern_deadlock(c, s, a, b): in_on_a_descendant = Event() in_on_b_descendant = Event() block_descendants = Event() - await block_ancestor.set() - f = c.submit(block, 1, in_ancestor, block_ancestor, key="f", resources={"a": 1}) - g = c.submit(inc, f, key="g", resources={"a": 1}) + f = c.submit(block, 1, in_ancestor, block_ancestor, key="f", resources={"b": 1}) + g = c.submit(inc, f, key="g", resources={"b": 1}) h1 = c.submit( - block, g, in_on_a_descendant, block_descendants, key="h1", resources={"a": 1} + block, g, in_on_b_descendant, block_descendants, key="h1", resources={"b": 1} ) h2 = c.submit( - block, g, in_on_b_descendant, block_descendants, key="h2", resources={"b": 1} + block, g, in_on_a_descendant, block_descendants, key="h2", resources={"a": 1} ) - - await asyncio.gather( - wait_for_state("g", "memory", s), - in_on_a_descendant.wait(), - in_on_b_descendant.wait(), + with captured_logger("distributed.scheduler", level=logging.ERROR) as logger: + async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: + await block_ancestor.set() + await asyncio.gather( + in_on_a_descendant.wait(), + in_on_b_descendant.wait(), + ) + await in_ancestor.clear() + while len(s.tasks["g"].who_has) < 2: + await asyncio.sleep(0.1) + await s.remove_worker(b.address, stimulus_id="remove_b") + await b.close(timeout=0) + await block_ancestor.clear() + + async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: + await in_ancestor.wait() + await in_ancestor.clear() + await s.remove_worker(b.address, stimulus_id="remove_b") + await block_ancestor.set() + await b.close(timeout=0) + await block_ancestor.clear() + + async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: + await in_ancestor.wait() + await in_ancestor.clear() + await s.remove_worker(b.address, stimulus_id="remove_b") + await block_ancestor.set() + await b.close(timeout=0) + await block_ancestor.clear() + + await block_descendants.set() + await block_ancestor.set() + with pytest.raises(KilledWorker, match="Attempted to run task 'f'"): + await h2 + + with pytest.raises(KilledWorker, match="Attempted to run task 'h1'"): + await h1 + + # Make sure h2 gets forgotten on a + await async_poll_for(lambda: not a.state.tasks, timeout=5) + assert ( + logger.getvalue() + == "Task h1 marked as failed because 2 workers died while trying to run it\nTask f marked as failed because 2 workers died while trying to run it\n" ) - await asyncio.gather(in_ancestor.clear(), block_ancestor.clear()) - await a.process.process.kill() - - await in_ancestor.wait() - await in_ancestor.clear() - await a.process.process.kill() - - await in_ancestor.wait() - await in_ancestor.clear() - await a.process.process.kill() - - await block_descendants.set() - # await block_ancestor.set() - await h2 - with pytest.raises(KilledWorker, match="Attempted to run task 'h1'"): - await h1 From d6678b6089373ddc839e2cb44ca948e09930b641 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 1 Mar 2024 13:38:34 +0100 Subject: [PATCH 04/23] Better test --- distributed/tests/test_scheduler.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 8a709e03e6a..ae8043eefb1 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4906,12 +4906,6 @@ def block(x, in_event, block_event): async def test_fan_out_pattern_deadlock(c, s, a): """Regression test for https://github.com/dask/distributed/issues/8548 - Conceptually, this test simulates a fan-out based workload, where the worker - that processed the single input task of the fan-out dies once the fan-out task is processed - but before the scheduler recognizes that the single input task has successfully been sent to - another worker which now processes a fan-out task. Then, workers continue to die during processing - of the input tasks. - This test heavily uses resources to force scheduling decisions. """ in_ancestor = Event() @@ -4919,8 +4913,12 @@ async def test_fan_out_pattern_deadlock(c, s, a): in_on_a_descendant = Event() in_on_b_descendant = Event() block_descendants = Event() + + # Input task to 'g' that we can fail f = c.submit(block, 1, in_ancestor, block_ancestor, key="f", resources={"b": 1}) g = c.submit(inc, f, key="g", resources={"b": 1}) + + # Fan-out from 'g' and run h1 and h2 on different workers h1 = c.submit( block, g, in_on_b_descendant, block_descendants, key="h1", resources={"b": 1} ) @@ -4935,12 +4933,17 @@ async def test_fan_out_pattern_deadlock(c, s, a): in_on_b_descendant.wait(), ) await in_ancestor.clear() + + # Make sure that the scheduler knows that both workers hold 'g' in memory while len(s.tasks["g"].who_has) < 2: await asyncio.sleep(0.1) + # Remove worker 'b' while it's processing h1 await s.remove_worker(b.address, stimulus_id="remove_b") await b.close(timeout=0) await block_ancestor.clear() + # Repeatedly remove new instances of the 'b' worker while it processes 'f' + # to trigger an transition for 'f' to 'erred' async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: await in_ancestor.wait() await in_ancestor.clear() @@ -4957,16 +4960,19 @@ async def test_fan_out_pattern_deadlock(c, s, a): await b.close(timeout=0) await block_ancestor.clear() - await block_descendants.set() - await block_ancestor.set() + # The fanned-out tasks err because their transitive dependency 'f' erred before they were computed with pytest.raises(KilledWorker, match="Attempted to run task 'f'"): await h2 + await block_descendants.set() + await block_ancestor.set() + with pytest.raises(KilledWorker, match="Attempted to run task 'h1'"): await h1 - # Make sure h2 gets forgotten on a + # Make sure that h2 gets forgotten on worker 'a' await async_poll_for(lambda: not a.state.tasks, timeout=5) + # Ensure that no other errors including transition failures were logged assert ( logger.getvalue() == "Task h1 marked as failed because 2 workers died while trying to run it\nTask f marked as failed because 2 workers died while trying to run it\n" From 7c130251e44fadd2d86bb5c1c8c4cc56a7f77d2a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 1 Mar 2024 13:40:00 +0100 Subject: [PATCH 05/23] Simplify --- distributed/tests/test_scheduler.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index ae8043eefb1..3cecf6ca55a 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4948,17 +4948,13 @@ async def test_fan_out_pattern_deadlock(c, s, a): await in_ancestor.wait() await in_ancestor.clear() await s.remove_worker(b.address, stimulus_id="remove_b") - await block_ancestor.set() await b.close(timeout=0) - await block_ancestor.clear() async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: await in_ancestor.wait() await in_ancestor.clear() await s.remove_worker(b.address, stimulus_id="remove_b") - await block_ancestor.set() await b.close(timeout=0) - await block_ancestor.clear() # The fanned-out tasks err because their transitive dependency 'f' erred before they were computed with pytest.raises(KilledWorker, match="Attempted to run task 'f'"): From b8ccfa43e97b30bb5fd89a632042aeda47c9587d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 1 Mar 2024 17:45:54 +0100 Subject: [PATCH 06/23] Fix --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index da8705c7c07..1d54064be7b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2709,7 +2709,7 @@ def _transition_processing_erred( ) for dts in ts.dependents: - if dts.who_has: + if not dts.who_has: dts.exception_blame = failing_ts recommendations[dts.key] = "erred" From 723bcb5b2ad52723719f20f3169cd4333796927a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 1 Mar 2024 19:03:10 +0100 Subject: [PATCH 07/23] Improve test --- distributed/tests/test_scheduler.py | 31 ++++++++++++++++------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 3cecf6ca55a..ce398f94f1c 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4915,16 +4915,21 @@ async def test_fan_out_pattern_deadlock(c, s, a): block_descendants = Event() # Input task to 'g' that we can fail - f = c.submit(block, 1, in_ancestor, block_ancestor, key="f", resources={"b": 1}) - g = c.submit(inc, f, key="g", resources={"b": 1}) + with dask.annotate(resources={"b": 1}): + f = delayed(block)(1, in_ancestor, block_ancestor, dask_key_name="f") + g = delayed(inc)(f, dask_key_name="g") - # Fan-out from 'g' and run h1 and h2 on different workers - h1 = c.submit( - block, g, in_on_b_descendant, block_descendants, key="h1", resources={"b": 1} - ) - h2 = c.submit( - block, g, in_on_a_descendant, block_descendants, key="h2", resources={"a": 1} - ) + # Fan-out from 'g' and run h1 and h2 on different workers + h1 = delayed(block)( + g, in_on_b_descendant, block_descendants, dask_key_name="h1" + ) + with dask.annotate(resources={"a": 1}): + h2 = delayed(block)( + g, in_on_a_descendant, block_descendants, dask_key_name="h2" + ) + del g + + f, h1, h2 = c.compute([f, h1, h2]) with captured_logger("distributed.scheduler", level=logging.ERROR) as logger: async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: await block_ancestor.set() @@ -4956,16 +4961,14 @@ async def test_fan_out_pattern_deadlock(c, s, a): await s.remove_worker(b.address, stimulus_id="remove_b") await b.close(timeout=0) - # The fanned-out tasks err because their transitive dependency 'f' erred before they were computed - with pytest.raises(KilledWorker, match="Attempted to run task 'f'"): - await h2 - await block_descendants.set() - await block_ancestor.set() + await h2 with pytest.raises(KilledWorker, match="Attempted to run task 'h1'"): await h1 + del h1, h2 + await block_ancestor.set() # Make sure that h2 gets forgotten on worker 'a' await async_poll_for(lambda: not a.state.tasks, timeout=5) # Ensure that no other errors including transition failures were logged From 1254620c2dcc786e98525148d94e21c2c666c236 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 4 Mar 2024 16:02:27 +0100 Subject: [PATCH 08/23] Fix test on mindep --- distributed/tests/test_scheduler.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index ce398f94f1c..7f83a1fbd44 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4912,7 +4912,8 @@ async def test_fan_out_pattern_deadlock(c, s, a): block_ancestor = Event() in_on_a_descendant = Event() in_on_b_descendant = Event() - block_descendants = Event() + block_on_a_descendant = Event() + block_on_b_descendant = Event() # Input task to 'g' that we can fail with dask.annotate(resources={"b": 1}): @@ -4921,11 +4922,11 @@ async def test_fan_out_pattern_deadlock(c, s, a): # Fan-out from 'g' and run h1 and h2 on different workers h1 = delayed(block)( - g, in_on_b_descendant, block_descendants, dask_key_name="h1" + g, in_on_b_descendant, block_on_b_descendant, dask_key_name="h1" ) with dask.annotate(resources={"a": 1}): h2 = delayed(block)( - g, in_on_a_descendant, block_descendants, dask_key_name="h2" + g, in_on_a_descendant, block_on_a_descendant, dask_key_name="h2" ) del g @@ -4944,7 +4945,8 @@ async def test_fan_out_pattern_deadlock(c, s, a): await asyncio.sleep(0.1) # Remove worker 'b' while it's processing h1 await s.remove_worker(b.address, stimulus_id="remove_b") - await b.close(timeout=0) + await block_on_b_descendant.set() + await b.close() await block_ancestor.clear() # Repeatedly remove new instances of the 'b' worker while it processes 'f' @@ -4953,22 +4955,24 @@ async def test_fan_out_pattern_deadlock(c, s, a): await in_ancestor.wait() await in_ancestor.clear() await s.remove_worker(b.address, stimulus_id="remove_b") - await b.close(timeout=0) + await block_ancestor.set() + await b.close() + await block_ancestor.clear() async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: await in_ancestor.wait() await in_ancestor.clear() await s.remove_worker(b.address, stimulus_id="remove_b") - await b.close(timeout=0) + await block_ancestor.set() + await b.close() - await block_descendants.set() + await block_on_a_descendant.set() await h2 with pytest.raises(KilledWorker, match="Attempted to run task 'h1'"): await h1 del h1, h2 - await block_ancestor.set() # Make sure that h2 gets forgotten on worker 'a' await async_poll_for(lambda: not a.state.tasks, timeout=5) # Ensure that no other errors including transition failures were logged From d6abfd1474f90312c5e681ab401e7a303ab67429 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 4 Mar 2024 17:17:52 +0100 Subject: [PATCH 09/23] [skip-caching] From fcd7f0d4363b45c693e00aa6fd5ba131615a5708 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 4 Mar 2024 18:29:24 +0100 Subject: [PATCH 10/23] [skip-caching] From 47e6ea05d0c55b818ea9785c5b343c7a8d934178 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Mar 2024 13:54:42 +0100 Subject: [PATCH 11/23] Update distributed/scheduler.py Co-authored-by: crusaderky --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 1d54064be7b..abc24f082ab 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2486,7 +2486,7 @@ def _transition_memory_released( elif ts.has_lost_dependencies: recommendations[key] = "forgotten" elif (ts.who_wants or ts.waiters) and not any( - dts.state in ("erred",) for dts in ts.dependencies + dts.state == "erred" for dts in ts.dependencies ): recommendations[key] = "waiting" From 1efefa4620cb3a0e661265fb6351be63c5a6121d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Mar 2024 14:01:54 +0100 Subject: [PATCH 12/23] Update distributed/scheduler.py Co-authored-by: crusaderky --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index abc24f082ab..eed02e5c461 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2084,8 +2084,8 @@ def _transition_released_waiting(self, key: Key, stimulus_id: str) -> RecsMsgs: assert not ts.processing_on for dts in ts.dependencies: assert dts.state not in {"forgotten", "erred"}, ( - ts, - dts, + str(ts), + str(dts), self.transition_log, ) From 071ac2827ff47404436fc7b7d35f9f3ebab6b405 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Mar 2024 14:08:27 +0100 Subject: [PATCH 13/23] Apply suggestions from code review Co-authored-by: crusaderky --- distributed/tests/test_scheduler.py | 32 ++++++++++------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 7f83a1fbd44..0c57c4618d9 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4908,43 +4908,33 @@ async def test_fan_out_pattern_deadlock(c, s, a): This test heavily uses resources to force scheduling decisions. """ - in_ancestor = Event() - block_ancestor = Event() - in_on_a_descendant = Event() - in_on_b_descendant = Event() - block_on_a_descendant = Event() - block_on_b_descendant = Event() + in_f, block_f = Event(), Event() + in_ha, block_ha = Event(), Event() + in_hb, block_hb = Event(), Event() # Input task to 'g' that we can fail with dask.annotate(resources={"b": 1}): - f = delayed(block)(1, in_ancestor, block_ancestor, dask_key_name="f") + f = delayed(block)(1, in_f, block_f, dask_key_name="f") g = delayed(inc)(f, dask_key_name="g") # Fan-out from 'g' and run h1 and h2 on different workers - h1 = delayed(block)( - g, in_on_b_descendant, block_on_b_descendant, dask_key_name="h1" - ) + hb = delayed(block)(g, in_hb, block_hb, dask_key_name="hb") with dask.annotate(resources={"a": 1}): - h2 = delayed(block)( - g, in_on_a_descendant, block_on_a_descendant, dask_key_name="h2" - ) - del g + ha = delayed(block)(g, in_ha, block_ha, dask_key_name="ha") f, h1, h2 = c.compute([f, h1, h2]) with captured_logger("distributed.scheduler", level=logging.ERROR) as logger: async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: await block_ancestor.set() - await asyncio.gather( - in_on_a_descendant.wait(), - in_on_b_descendant.wait(), - ) + await in_on_a_descendant.wait() + await in_on_b_descendant.wait() await in_ancestor.clear() # Make sure that the scheduler knows that both workers hold 'g' in memory while len(s.tasks["g"].who_has) < 2: await asyncio.sleep(0.1) # Remove worker 'b' while it's processing h1 - await s.remove_worker(b.address, stimulus_id="remove_b") + await s.remove_worker(b.address, stimulus_id="remove_b1") await block_on_b_descendant.set() await b.close() await block_ancestor.clear() @@ -4954,7 +4944,7 @@ async def test_fan_out_pattern_deadlock(c, s, a): async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: await in_ancestor.wait() await in_ancestor.clear() - await s.remove_worker(b.address, stimulus_id="remove_b") + await s.remove_worker(b.address, stimulus_id="remove_b2") await block_ancestor.set() await b.close() await block_ancestor.clear() @@ -4962,7 +4952,7 @@ async def test_fan_out_pattern_deadlock(c, s, a): async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: await in_ancestor.wait() await in_ancestor.clear() - await s.remove_worker(b.address, stimulus_id="remove_b") + await s.remove_worker(b.address, stimulus_id="remove_b3") await block_ancestor.set() await b.close() From 68ea88e32ce7b5078566ef80e94ba658b3f78870 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Mar 2024 14:12:43 +0100 Subject: [PATCH 14/23] Fix test --- distributed/tests/test_scheduler.py | 40 ++++++++++++++--------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 0c57c4618d9..8a4e78b0d50 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4922,51 +4922,51 @@ async def test_fan_out_pattern_deadlock(c, s, a): with dask.annotate(resources={"a": 1}): ha = delayed(block)(g, in_ha, block_ha, dask_key_name="ha") - f, h1, h2 = c.compute([f, h1, h2]) + f, ha, hb = c.compute([f, ha, hb]) with captured_logger("distributed.scheduler", level=logging.ERROR) as logger: async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: - await block_ancestor.set() - await in_on_a_descendant.wait() - await in_on_b_descendant.wait() - await in_ancestor.clear() + await block_f.set() + await in_ha.wait() + await in_hb.wait() + await in_f.clear() # Make sure that the scheduler knows that both workers hold 'g' in memory while len(s.tasks["g"].who_has) < 2: await asyncio.sleep(0.1) # Remove worker 'b' while it's processing h1 await s.remove_worker(b.address, stimulus_id="remove_b1") - await block_on_b_descendant.set() + await block_hb.set() await b.close() - await block_ancestor.clear() + await block_f.clear() # Repeatedly remove new instances of the 'b' worker while it processes 'f' # to trigger an transition for 'f' to 'erred' async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: - await in_ancestor.wait() - await in_ancestor.clear() + await in_f.wait() + await in_f.clear() await s.remove_worker(b.address, stimulus_id="remove_b2") - await block_ancestor.set() + await block_f.set() await b.close() - await block_ancestor.clear() + await block_f.clear() async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: - await in_ancestor.wait() - await in_ancestor.clear() + await in_f.wait() + await in_f.clear() await s.remove_worker(b.address, stimulus_id="remove_b3") - await block_ancestor.set() + await block_f.set() await b.close() - await block_on_a_descendant.set() - await h2 + await block_ha.set() + await ha - with pytest.raises(KilledWorker, match="Attempted to run task 'h1'"): - await h1 + with pytest.raises(KilledWorker, match="Attempted to run task 'hb'"): + await hb - del h1, h2 + del ha, hb # Make sure that h2 gets forgotten on worker 'a' await async_poll_for(lambda: not a.state.tasks, timeout=5) # Ensure that no other errors including transition failures were logged assert ( logger.getvalue() - == "Task h1 marked as failed because 2 workers died while trying to run it\nTask f marked as failed because 2 workers died while trying to run it\n" + == "Task hb marked as failed because 2 workers died while trying to run it\nTask f marked as failed because 2 workers died while trying to run it\n" ) From 5884bf5ac14ed99ab1bc4fe1af3b919d2f780d78 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Mar 2024 14:24:32 +0100 Subject: [PATCH 15/23] Remove b.close --- distributed/tests/test_scheduler.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 8a4e78b0d50..30d0ccb4a72 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4936,7 +4936,6 @@ async def test_fan_out_pattern_deadlock(c, s, a): # Remove worker 'b' while it's processing h1 await s.remove_worker(b.address, stimulus_id="remove_b1") await block_hb.set() - await b.close() await block_f.clear() # Repeatedly remove new instances of the 'b' worker while it processes 'f' @@ -4946,7 +4945,6 @@ async def test_fan_out_pattern_deadlock(c, s, a): await in_f.clear() await s.remove_worker(b.address, stimulus_id="remove_b2") await block_f.set() - await b.close() await block_f.clear() async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: @@ -4954,7 +4952,6 @@ async def test_fan_out_pattern_deadlock(c, s, a): await in_f.clear() await s.remove_worker(b.address, stimulus_id="remove_b3") await block_f.set() - await b.close() await block_ha.set() await ha From 9ee578ac78a84fe9d28d86e5ffc04567ae8143ad Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Mar 2024 14:51:48 +0100 Subject: [PATCH 16/23] waiters --- distributed/scheduler.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index eed02e5c461..38ec2a4f78f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2708,10 +2708,9 @@ def _transition_processing_erred( ) ) - for dts in ts.dependents: - if not dts.who_has: - dts.exception_blame = failing_ts - recommendations[dts.key] = "erred" + for dts in ts.waiters or set(): + dts.exception_blame = failing_ts + recommendations[dts.key] = "erred" for dts in ts.dependencies: if dts.waiters: From 5ee1b6a3fe7be69c86624de8bda9f1ba996e7284 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Mar 2024 14:58:04 +0100 Subject: [PATCH 17/23] Simpler test --- distributed/tests/test_scheduler.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 30d0ccb4a72..10f1e8499e3 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4901,7 +4901,7 @@ def block(x, in_event, block_event): @gen_cluster( client=True, nthreads=[("", 1, {"resources": {"a": 1}})], - config={"distributed.scheduler.allowed-failures": 1}, + config={"distributed.scheduler.allowed-failures": 0}, ) async def test_fan_out_pattern_deadlock(c, s, a): """Regression test for https://github.com/dask/distributed/issues/8548 @@ -4938,7 +4938,7 @@ async def test_fan_out_pattern_deadlock(c, s, a): await block_hb.set() await block_f.clear() - # Repeatedly remove new instances of the 'b' worker while it processes 'f' + # Remove the new instance of the 'b' worker while it processes 'f' # to trigger an transition for 'f' to 'erred' async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: await in_f.wait() @@ -4947,12 +4947,6 @@ async def test_fan_out_pattern_deadlock(c, s, a): await block_f.set() await block_f.clear() - async with Worker(s.address, nthreads=1, resources={"b": 1}) as b: - await in_f.wait() - await in_f.clear() - await s.remove_worker(b.address, stimulus_id="remove_b3") - await block_f.set() - await block_ha.set() await ha @@ -4965,5 +4959,5 @@ async def test_fan_out_pattern_deadlock(c, s, a): # Ensure that no other errors including transition failures were logged assert ( logger.getvalue() - == "Task hb marked as failed because 2 workers died while trying to run it\nTask f marked as failed because 2 workers died while trying to run it\n" + == "Task hb marked as failed because 1 workers died while trying to run it\nTask f marked as failed because 1 workers died while trying to run it\n" ) From d8939ad4bc37086da52b1c343d7ea264264fe3fc Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Mar 2024 15:40:30 +0100 Subject: [PATCH 18/23] Always require worker --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 38ec2a4f78f..7ceb251dde9 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2630,7 +2630,7 @@ def _transition_processing_erred( self, key: Key, stimulus_id: str, - worker: str | None = None, + worker: str, *, cause: Key | None = None, exception: Serialized | None = None, From 6da78245afb2602bc2bea88f14be528f4e8136c5 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Mar 2024 17:06:01 +0100 Subject: [PATCH 19/23] Add test --- distributed/tests/test_scheduler.py | 69 +++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 10f1e8499e3..b81dfafc8bf 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4961,3 +4961,72 @@ async def test_fan_out_pattern_deadlock(c, s, a): logger.getvalue() == "Task hb marked as failed because 1 workers died while trying to run it\nTask f marked as failed because 1 workers died while trying to run it\n" ) + + +@gen_cluster( + client=True, + nthreads=[("", 1, {"resources": {"a": 1}})], + config={"distributed.scheduler.allowed-failures": 0}, +) +async def test_stimulus_from_erred_task(c, s, a): + """This test heavily uses resources to force scheduling decisions.""" + in_f, block_f = Event(), Event() + in_g, block_g = Event(), Event() + + with dask.annotate(resources={"b": 1}): + f = delayed(block)(1, in_f, block_f, dask_key_name="f") + + with dask.annotate(resources={"a": 1}): + g = delayed(block)(f, in_g, block_g, dask_key_name="g") + + f, g = c.compute([f, g]) + with captured_logger("distributed.scheduler", level=logging.ERROR) as logger: + frozen_stream_from_a_ctx = freeze_batched_send(a.batched_stream) + frozen_stream_from_a_ctx.__enter__() + + async with Worker(s.address, nthreads=1, resources={"b": 1}) as b1: + await block_f.set() + await in_g.wait() + await in_f.clear() + frozen_stream_to_a_ctx = freeze_batched_send(s.stream_comms[a.address]) + frozen_stream_to_a_ctx.__enter__() + await s.remove_worker(b1.address, stimulus_id="remove_b1") + await block_f.clear() + + # Remove the new instance of the 'b' worker while it processes 'f' + # to trigger an transition for 'f' to 'erred' + async with Worker(s.address, nthreads=1, resources={"b": 1}) as b2: + await in_f.wait() + await in_f.clear() + await s.remove_worker(b2.address, stimulus_id="remove_b2") + await block_f.set() + + with pytest.raises(KilledWorker, match="Attempted to run task 'f'"): + await f + + # g has already been transitioned to 'erred' because 'f' failed + with pytest.raises(KilledWorker, match="Attempted to run task 'f'"): + await g + + # Finish 'g' and let the scheduler know so it can trigger cleanup + await block_g.set() + with mock.patch.object( + s, "stimulus_task_finished", wraps=s.stimulus_task_finished + ) as wrapped_stimulus: + frozen_stream_from_a_ctx.__exit__(None, None, None) + # Make sure the `stimulus_task_finished` gets processed + await async_poll_for(lambda: wrapped_stimulus.call_count == 1, timeout=5) + + # Allow the scheduler to talk to the worker again + frozen_stream_to_a_ctx.__exit__(None, None, None) + # Make sure all data gets forgotten on worker 'a' + await async_poll_for(lambda: not a.state.tasks, timeout=5) + + # del f, g + # await async_poll_for(lambda: not a.state.tasks, timeout=5) + + # Ensure that no other errors including transition failures were logged + assert ( + logger.getvalue() + == "Task f marked as failed because 1 workers died while trying to run it\n" + ) From 2375647eb84b5aa20d1ab9e0d9d08f91ca112b11 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Mar 2024 17:10:17 +0100 Subject: [PATCH 20/23] cleanup --- distributed/tests/test_scheduler.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index b81dfafc8bf..dd72d1d39de 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -5022,9 +5022,6 @@ async def test_stimulus_from_erred_task(c, s, a): # Make sure all data gets forgotten on worker 'a' await async_poll_for(lambda: not a.state.tasks, timeout=5) - # del f, g - # await async_poll_for(lambda: not a.state.tasks, timeout=5) - # Ensure that no other errors including transition failures were logged assert ( logger.getvalue() From f6dfe39abed6c3e6d00b838db23905fdff3db591 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 7 Mar 2024 12:05:43 +0100 Subject: [PATCH 21/23] Update distributed/tests/test_scheduler.py Co-authored-by: crusaderky --- distributed/tests/test_scheduler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index dd72d1d39de..eae8fcc9910 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4931,8 +4931,7 @@ async def test_fan_out_pattern_deadlock(c, s, a): await in_f.clear() # Make sure that the scheduler knows that both workers hold 'g' in memory - while len(s.tasks["g"].who_has) < 2: - await asyncio.sleep(0.1) + await async_poll_for(lambda: len(s.tasks["g"].who_has) == 2, timeout=5) # Remove worker 'b' while it's processing h1 await s.remove_worker(b.address, stimulus_id="remove_b1") await block_hb.set() From e054a3885de936b478e627f0d85ce3158d242c45 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 7 Mar 2024 12:05:49 +0100 Subject: [PATCH 22/23] Update distributed/tests/test_scheduler.py Co-authored-by: crusaderky --- distributed/tests/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index eae8fcc9910..ed95d4968fe 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4993,7 +4993,7 @@ async def test_stimulus_from_erred_task(c, s, a): await block_f.clear() # Remove the new instance of the 'b' worker while it processes 'f' - # to trigger an transition for 'f' to 'erred' + # to trigger a transition for 'f' to 'erred' async with Worker(s.address, nthreads=1, resources={"b": 1}) as b2: await in_f.wait() await in_f.clear() From 87c2b38dd919fda2388105bcea3dd6a8e1501a72 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 7 Mar 2024 12:07:11 +0100 Subject: [PATCH 23/23] Minor --- distributed/scheduler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7ceb251dde9..a7217f53032 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2684,8 +2684,7 @@ def _transition_processing_erred( if not ts.erred_on: ts.erred_on = set() - if worker: - ts.erred_on.add(worker) + ts.erred_on.add(worker) if exception is not None: ts.exception = exception ts.exception_text = exception_text