Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task state validation failure for fetch with who_has #6147

Closed
mrocklin opened this issue Apr 16, 2022 · 13 comments
Closed

Task state validation failure for fetch with who_has #6147

mrocklin opened this issue Apr 16, 2022 · 13 comments
Labels
deadlock The cluster appears to not make any progress

Comments

@mrocklin
Copy link
Member

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 3975, in validate_task
    self.validate_task_fetch(ts)
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 3917, in validate_task_fetch
    assert ts.who_has
AssertionError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 1191, in handle_scheduler
    await self.handle_stream(comm, every_cycle=[self.ensure_communicating])
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 625, in handle_stream
    handler(**merge(extra, msg))
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 1916, in handle_compute_task
    self.transitions(recommendations, stimulus_id=stimulus_id)
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 2621, in transitions
    self.validate_task(ts)
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 3985, in validate_task
    raise AssertionError(
AssertionError: Invalid TaskState encountered for <TaskState "('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)" fetch>.
Story:
[("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'compute-task', 'compute-task-1650128700.1081934', 1650128700.1481702), ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'released', 'waiting', 'waiting', {"('rechunk-split-77b7d884d5a5f48f375d62bb7d136665', 1333)": 'fetch'}, 'compute-task-1650128700.1081934', 1650128700.1482384), ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'waiting', 'ready', 'ready', {}, 'ensure-communicating-1650128700.1599193', 1650128700.2175784), ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'ready', 'executing', 'executing', {}, 'compute-task-1650128696.340246', 1650128700.218642), ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'put-in-memory', 'compute-task-1650128696.340246', 1650128700.268273), ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'executing', 'memory', 'memory', {"('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 251)": 'executing'}, 'compute-task-1650128696.340246', 1650128700.268321), ('free-keys', ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)",), 'processing-released-1650128700.5942154', 1650128701.1206636), ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'release-key', 'processing-released-1650128700.5942154', 1650128701.1[2067](https://github.com/dask/distributed/runs/6048720092?check_suite_focus=true#step:11:2067)46), ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'memory', 'released', 'released', {"('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)": 'forgotten'}, 'processing-released-1650128700.5942154', 1650128701.1[2073](https://github.com/dask/distributed/runs/6048720092?check_suite_focus=true#step:11:2073)92), ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'released', 'forgotten', 'forgotten', {}, 'processing-released-1650128700.5942154', 1650128701.1[2075](https://github.com/dask/distributed/runs/6048720092?check_suite_focus=true#step:11:2075)35), ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'ensure-task-exists', 'released', 'compute-task-1650128701.1707418', 1650128701.1864583), ("('rechunk-merge-77b7d884d5a5f48f375d62bb7d136665', 0, 333)", 'released', 'fetch', 'fetch', {}, 'compute-task-1650128701.1707418', 1650128701.1865091)]

https://github.com/dask/distributed/runs/6048720092?check_suite_focus=true

This was found by the new test_chaos_rechunk test.

cc @fjetter @gjoseph92

@crusaderky
Copy link
Collaborator

@mrocklin
Copy link
Member Author

This failed again yesterday in CI. Copying the story here for easier reading

[("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'compute-task',
  'compute-task-1650782205.654249',
  1650782205.699733),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'released',
  'waiting',
  'waiting',
  {"('rechunk-split-06fee79f9945080fdc867fb46044dc51', 664)": 'fetch'},
  'compute-task-1650782205.654249',
  1650782205.6998458),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'waiting',
  'ready',
  'ready',
  {},
  'ensure-communicating-1650782205.700746',
  1650782205.7857978),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'ready',
  'executing',
  'executing',
  {},
  'compute-task-1650782202.6983402',
  1650782205.806335),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'put-in-memory',
  'compute-task-1650782202.6983402',
  1650782205.836979),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'executing',
  'memory',
  'memory',
  {"('rechunk-split-06fee79f9945080fdc867fb46044dc51', 819)": 'executing'},
  'compute-task-1650782202.6983402',
  1650782205.8370361),
 ('free-keys',
  ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",),
  'processing-released-1650782206.635477',
  1650782206.9505332),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'release-key',
  'processing-released-1650782206.635477',
  1650782206.950552),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'memory',
  'released',
  'released',
  {"('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)": 'forgotten'},
  'processing-released-1650782206.635477',
  1650782206.950907),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'released',
  'forgotten',
  'forgotten',
  {},
  'processing-released-1650782206.635477',
  1650782206.9509299),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'ensure-task-exists',
  'released',
  'compute-task-1650782206.894887',
  1650782206.959998),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'released',
  'fetch',
  'fetch',
  {},
  'compute-task-1650782206.894887',
  1650782206.9600902)]

@mrocklin
Copy link
Member Author

This also occurred in the same test run

2022-04-24 06:36:48,925 - distributed.worker - INFO - Starting Worker plugin kill
2022-04-24 06:36:48,926 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:62207
2022-04-24 06:36:48,927 - distributed.worker - INFO - -------------------------------------------------
2022-04-24 06:36:48,931 - distributed.core - INFO - Starting established connection
2022-04-24 06:36:48,936 - distributed.nanny - WARNING - Restarting worker
2022-04-24 06:36:49,081 - distributed.stealing - ERROR - 'tcp://127.0.0.1:62227'
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/stealing.py", line 242, in move_task_request
    self.scheduler.stream_comms[victim.address].send(
KeyError: 'tcp://127.0.0.1:62227'
2022-04-24 06:36:49,084 - distributed.utils - ERROR - 'tcp://127.0.0.1:62227'
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 693, in log_errors
    yield
  File "/Users/runner/work/distributed/distributed/distributed/stealing.py", line 456, in balance
    maybe_move_task(
  File "/Users/runner/work/distributed/distributed/distributed/stealing.py", line 362, in maybe_move_task
    self.move_task_request(ts, sat, idl)
  File "/Users/runner/work/distributed/distributed/distributed/stealing.py", line 242, in move_task_request
    self.scheduler.stream_comms[victim.address].send(
KeyError: 'tcp://127.0.0.1:62227'
2022-04-24 06:36:49,085 - tornado.application - ERROR - Exception in callback <bound method WorkStealing.balance of <distributed.stealing.WorkStealing object at 0x13dd740d0>>
Traceback (most recent call last):
  File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
  File "/Users/runner/work/distributed/distributed/distributed/stealing.py", line 456, in balance
    maybe_move_task(
  File "/Users/runner/work/distributed/distributed/distributed/stealing.py", line 362, in maybe_move_task
    self.move_task_request(ts, sat, idl)
  File "/Users/runner/work/distributed/distributed/distributed/stealing.py", line 242, in move_task_request
    self.scheduler.stream_comms[victim.address].send(
KeyError: 'tcp://127.0.0.1:62227'

@mrocklin
Copy link
Member Author

Ensure_task_exists is new to me. @crusaderky it looks like you might also have some familiarity here. The thing that confuses me here is this shift:

 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'released',
  'forgotten',
  'forgotten',
  {},
  'processing-released-1650782206.635477',
  1650782206.9509299),
 ("('rechunk-merge-06fee79f9945080fdc867fb46044dc51', 0, 166)",
  'ensure-task-exists',
  'released',
  'compute-task-1650782206.894887',
  1650782206.959998),

@mrocklin mrocklin added the deadlock The cluster appears to not make any progress label Apr 24, 2022
@fjetter
Copy link
Member

fjetter commented Apr 25, 2022

Ensure_task_exists

This is basically a debug log that prints the state of a task whenever it is encountered as a dependency. The log you posted tells us that the task was properly, cleanly forgotten, i.e. all state should have been purged during the forgotten transition.
It was encountered as a new dependency (or task to compute) and since it wasn't known it starts in released.

I suspect update_who_has. I'll try to reproduce and have a look

@fjetter
Copy link
Member

fjetter commented Apr 25, 2022

I can easily reproduce errors running the test_chaos_rechunk but it appears to not fail for me. Exceptions are just logged. Is this what others observe as well? The linked CI build appears to fail because of a wrong occupancy calculation

@crusaderky
Copy link
Collaborator

indeed test_chaos_rechunk can't fail. See my comment #6123 (comment)

@mrocklin
Copy link
Member Author

mrocklin commented Apr 25, 2022 via email

@mrocklin
Copy link
Member Author

mrocklin commented Apr 25, 2022 via email

@mrocklin
Copy link
Member Author

Aside from the issue around raising errors (which I think may be fixed above) do folks have thoughts on the actual exception?

@fjetter
Copy link
Member

fjetter commented Apr 25, 2022

I could trace this to a race condition where a task is assigned and released frequently on a worker. The task is in memory for a time but released again.

ultimately, the worker receives a request like

    "op": "compute-task",
    "key": "foo",
    "who_has": {"bar": [self.address]}

even though bar is forgotten on the worker, i.e. definitely not in memory. I came across something like this already in #4784 which led me to introduce

if self.address in workers and self.tasks[dep].state != "memory":
logger.debug(
"Scheduler claims worker %s holds data for task %s which is not true.",
self.name,
dep,
)
# Do not mutate the input dict. That's rude
workers = set(workers) - {self.address}
dep_ts.who_has.update(workers)
which removes self.address from who_has such that it it empty then.

I think we should simply transition the task to missing in this case which would then cause the scheduler to properly correct the state. I'm currently trying to narrow this behavior down a bit better by reducing a test case.

@mrocklin
Copy link
Member Author

Punting problems like these back to the scheduler and letting it have another try sounds like a good strategy in general.

@fjetter
Copy link
Member

fjetter commented Apr 26, 2022

I dug a bit deeper since something in the logs appeared to be off. Particularly, why the key is forgotten in the first place. Using #6161 I could track this down to a transition triggered by register-worker. Inspecting the events shows a sequence like

  • 'add-worker',
  • 'worker-status-change',
  • 'remove-worker',
  • 'add-worker',
  • 'remove-worker',
  • 'missing-data',
  • 'missing-data',
  • 'missing-data',
  • 'missing-data'

I am a bit confused and worried why the worker is added (and removed) twice. The chaos code should let the worker connect and after it dies disconnect. I don't understand why it reconnects. The reconnection seems to put us into the corrupt state. I think the missing-data events are not related

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deadlock The cluster appears to not make any progress
Projects
None yet
Development

No branches or pull requests

3 participants