Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed/shuffle/tests/test_shuffle.py::test_clean_after_close #6405

Open
fjetter opened this issue May 20, 2022 · 1 comment
Open

distributed/shuffle/tests/test_shuffle.py::test_clean_after_close #6405

fjetter opened this issue May 20, 2022 · 1 comment
Labels
flaky test Intermittent failures on CI.

Comments

@fjetter
Copy link
Member

fjetter commented May 20, 2022

Occurred on #6400 but I've seen it in other PRs as well
https://github.com/dask/distributed/runs/6524459060?check_suite_focus=true

____________________________ test_clean_after_close ____________________________

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44193', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41815', name: 0, status: closed, stored: 0, running: 1/1, ready: 2, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38417', name: 1, status: closed, stored: 0, running: 1/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_clean_after_close(c, s, a, b):
        df = dask.datasets.timeseries(
            start="2000-01-01",
            end="2000-01-10",
            dtypes={"x": float, "y": float},
            freq="10 s",
        )
        x = dd.shuffle.shuffle(df, "x", shuffle="p2p").persist()
    
        while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()):
            await asyncio.sleep(0.01)
    
        await a.close()
>       clean_worker(a)

distributed/shuffle/tests/test_shuffle.py:460: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

worker = <Worker 'tcp://127.0.0.1:41815', name: 0, status: closed, stored: 0, running: 1/1, ready: 2, comm: 0, waiting: 0>

    def clean_worker(worker):
        """Assert that the worker has no shuffle state"""
>       assert not worker.extensions["shuffle"].shuffles
E       AssertionError: assert not {'6d5abfdcdfeb8f0fe8f7ad64604df132': <distributed.shuffle.shuffle_extension.Shuffle object at 0x7fd43ee86790>}
E        +  where {'6d5abfdcdfeb8f0fe8f7ad64604df132': <distributed.shuffle.shuffle_extension.Shuffle object at 0x7fd43ee86790>} = <distributed.shuffle.shuffle_extension.ShuffleWorkerExtension object at 0x7fd45ecc4b50>.shuffles

distributed/shuffle/tests/test_shuffle.py:30: AssertionError
----------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump/test_clean_after_close.yaml
----------------------------- Captured stderr call -----------------------------
2022-05-20 14:23:59,127 - distributed.core - ERROR - Exception while handling op shuffle_receive
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/shuffle/shuffle_extension.py", line 347, in _get_shuffle
    return self.shuffles[shuffle_id]
KeyError: '6d5abfdcdfeb8f0fe8f7ad64604df132'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 585, in handle_comm
    result = await result
  File "/home/runner/work/distributed/distributed/distributed/shuffle/shuffle_extension.py", line 252, in shuffle_receive
    shuffle = await self._get_shuffle(shuffle_id)
  File "/home/runner/work/distributed/distributed/distributed/shuffle/shuffle_extension.py", line 370, in _get_shuffle
    shuffle = Shuffle(
  File "/home/runner/work/distributed/distributed/distributed/shuffle/shuffle_extension.py", line 67, in __init__
    self.multi_file = MultiFile(
  File "/home/runner/work/distributed/distributed/distributed/shuffle/multi_file.py", line 68, in __init__
    os.mkdir(self.directory)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp5zq6vygj/dask-worker-space/worker-_36t5f_y/shuffle-6d5abfdcdfeb8f0fe8f7ad64604df132'
2022-05-20 14:23:59,130 - distributed.core - ERROR - Exception while handling op shuffle_receive
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/shuffle/shuffle_extension.py", line 347, in _get_shuffle
    return self.shuffles[shuffle_id]
KeyError: '6d5abfdcdfeb8f0fe8f7ad64604df132'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 585, in handle_comm
    result = await result
  File "/home/runner/work/distributed/distributed/distributed/shuffle/shuffle_extension.py", line 252, in shuffle_receive
    shuffle = await self._get_shuffle(shuffle_id)
  File "/home/runner/work/distributed/distributed/distributed/shuffle/shuffle_extension.py", line 370, in _get_shuffle
    shuffle = Shuffle(
  File "/home/runner/work/distributed/distributed/distributed/shuffle/shuffle_extension.py", line 67, in __init__
    self.multi_file = MultiFile(
  File "/home/runner/work/distributed/distributed/distributed/shuffle/multi_file.py", line 68, in __init__
    os.mkdir(self.directory)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp5zq6vygj/dask-worker-space/worker-_36t5f_y/shuffle-6d5abfdcdfeb8f0fe8f7ad64604df132'

There are also a few rare failures on main

image

https://github.com/dask/distributed/actions/runs/2356436925
https://github.com/dask/distributed/actions/runs/2292297969

cc @mrocklin

@fjetter fjetter added the flaky test Intermittent failures on CI. label May 20, 2022
@mrocklin
Copy link
Member

mrocklin commented May 20, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

No branches or pull requests

2 participants