Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_dask_collections.py::test_dataframe_set_index_sync passes locally, fails on CI #8561

Closed
milesgranger opened this issue Mar 8, 2024 · 11 comments · Fixed by #8569
Closed
Assignees

Comments

@milesgranger
Copy link
Contributor

Test passes fine locally but consistently fails on CI with CancelledError on lambda parameter.

https://github.com/milesgranger/distributed/actions/runs/8201545593/job/22430476616#step:19:3885

FAILED distributed/tests/test_dask_collections.py::test_dataframe_set_index_sync[] - concurrent.futures._base.CancelledError: ('len-tree-b08e45d3088106d64c77136c41d6bd5b', 0)

xref: #8560

@crusaderky
Copy link
Collaborator

crusaderky commented Mar 8, 2024

This has started failing since the switch to dask-expr.

@fjetter @phofl this makes me very nervous. We may want to treat it as a blocker - at least until we understand what's actually going on.

This was referenced Mar 8, 2024
@fjetter fjetter self-assigned this Mar 11, 2024
@fjetter
Copy link
Member

fjetter commented Mar 11, 2024

Just getting started with debugging. Haven't had any luck with reproducing locally so far. Could be either a timing issue or that tests are inteferring.

The logs already indicate something interesting

2024-03-08 10:07:19,968 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 0)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 1)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 2)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 3)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 4)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 5)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 6)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 7)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 8)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 9)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('operation-8ebe45051147aef38027a1960f230fee', 10)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 0)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 1)
2024-03-08 10:07:19,969 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 2)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 3)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 4)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 5)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 6)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 7)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 8)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 9)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('chunk-28ced5066580337a26eef5edca3c5f01', 10)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('len-tree-b08e45d3088106d64c77136c41d6bd5b', 1, 0)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('len-tree-b08e45d3088106d64c77136c41d6bd5b', 1, 1)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('len-tree-b08e45d3088106d64c77136c41d6bd5b', 2, 0)
2024-03-08 10:07:19,970 - distributed.scheduler - INFO - User asked for computation on lost data, ('len-tree-b08e45d3088106d64c77136c41d6bd5b', 0)

which is a check implemented in update_graph

while len(dsk) != n: # walk through new tasks, cancel any bad deps
n = len(dsk)
for k, deps in list(dependencies.items()):
if any(
dep not in self.tasks and dep not in dsk for dep in deps
): # bad key
lost_keys.add(k)
logger.info("User asked for computation on lost data, %s", k)
del dsk[k]
del dependencies[k]
if k in keys:
keys.remove(k)
del deps
that is typically pretty difficult to trigger if I'm not mistaken. It explains why the client gets a CancelledError but it's yet unclear how we end up in this situation

@fjetter
Copy link
Member

fjetter commented Mar 11, 2024

This condition can be triggered if the computed dependencies and the actual graph drift. The "real" condition this should alert to is if a previously scattered result has been lost. In this particular test there is no scattered data os this could be interpreted as the cluster dropping a persisted result.

An alternative theory is that the graph materialization, particularly around unpacking remote task, is somehow broken.

@fjetter
Copy link
Member

fjetter commented Mar 11, 2024

Looks like I can reproduce locally when running the entire test suite. That likely indicates some dirty state. Will bisect the test suite to narrow it down.

@fjetter
Copy link
Member

fjetter commented Mar 11, 2024

This seems to reproduce it for me locally. When I change anything here the test passes.

pytest --runslow \
    distributed/cli/tests/test_dask_scheduler.py::test_idle_timeout \
    distributed/cli/tests/test_dask_scheduler.py::test_restores_signal_handler \
    distributed/tests/test_dask_collections.py::test_dataframe_set_index_sync 

I have no idea what the connection between the CLI tests and the set_index is and why I have to run both of the CLI tests to trigger this. This is unfortunately still many seconds in runtime so it's difficult to iterate quickly (the CLI tests are excruciatingly slow nowadays)

@fjetter
Copy link
Member

fjetter commented Mar 11, 2024

I still don't understand what is happening but the two CLI tests are the only (nontrivial) tests in test_dask_scheduler.py that are using a ClickRunner (there is also test_version_option which is simply a click utility to printout versions. This one isn't doing anything).

If I replace ClickRunner with a popen in test_idle_timeout, everything works as expected again

@fjetter
Copy link
Member

fjetter commented Mar 11, 2024

Also, the test test_dataframe_set_index_sync only fails for me if executed twice. Due to its paramterization it is executed twice but it is irrelevant what the paramter actually is. It can be the same twice or different in any kind of ordering.

This all points towards a caching issue. Maybe something in the vicinity of tokenization that is messing with the graphs but I'm just guessing at this point

@crusaderky
Copy link
Collaborator

pytest --runslow \
    distributed/cli/tests/test_dask_scheduler.py::test_idle_timeout \
    distributed/cli/tests/test_dask_scheduler.py::test_restores_signal_handler \
    distributed/tests/test_dask_collections.py::test_dataframe_set_index_sync 

Reproduced. And if I add

dataframe:
  query-planning: false

to my ~/.config/dask/dask.yaml, the issue instantly disappears.

@fjetter
Copy link
Member

fjetter commented Mar 11, 2024

Yes, I could already trace this back to the singleton implementation. If I remove object deduplication, there is no longer an issue. This means that something is reusing names even though it mustn't. I have no idea how this connect to the ClickRunner, though

@fjetter
Copy link
Member

fjetter commented Mar 11, 2024

Ok... so, the problem we're facing here is that the tokenization of Future instances is [f.key, type(f)] which is not necessarily unique such that subsequent runs of the same test will generate new Future object that will however hash/tokenize to the old one. This causes us to reuse the old/stale Expression which is referencing the old/stale Future objects which causes us to throw away the new ones causing the scheduler to forget the results immediately... That's nasty and I'm glad we found this before the release.

@crusaderky
Copy link
Collaborator

If I remove object deduplication

Could you link the relevant code?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants