Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove wrong assert in handle compute #6370

Merged
merged 3 commits into from
May 20, 2022

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented May 18, 2022

This removes an erroneous assert statement introduced in #6327

See #6327 (comment) for details

The added test condition triggers this exact assert statement. However, the test passes properly if the assert is removed. All transitions happen as expected.

While debugging this, I noticed that the find-missing PC is actually running concurrently. I was surprised about this because the docs of PeriodicCallback specifically mention that an iteration is skipped if it takes too long, see https://github.com/tornadoweb/tornado/blob/43ae5839a56e445dd2d10539718f1e0c8053d995/tornado/ioloop.py#L863-L864 I'll break this out into a dedicated PR

@fjetter fjetter requested a review from crusaderky May 18, 2022 16:28
@crusaderky
Copy link
Collaborator

While debugging this, I noticed that the find-missing PC is actually running concurrently. I was surprised about this because the docs of PeriodicCallback specifically mention that an iteration is skipped if it takes too long, see https://github.com/tornadoweb/tornado/blob/43ae5839a56e445dd2d10539718f1e0c8053d995/tornado/ioloop.py#L863-L864

This has been just changed in tornado git tip: tornadoweb/tornado#3117

I'll break this out into a dedicated PR

Unnecessary after #6348

@github-actions
Copy link
Contributor

github-actions bot commented May 18, 2022

Unit Test Results

       15 files  ±0         15 suites  ±0   7h 17m 44s ⏱️ + 1m 20s
  2 805 tests +1    2 727 ✔️ +  2    78 💤  - 1  0 ±0 
20 801 runs  +7  19 883 ✔️ +11  918 💤  - 4  0 ±0 

Results for commit 3e9134b. ± Comparison against base commit 4d29246.

♻️ This comment has been updated with latest results.

await wait(f3)
f4 = c.submit(inc, f3, key="f4", workers=[w2.address])

await enter_get_data_1.wait()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Post #6371, you can:
1.

Suggested change
await enter_get_data_1.wait()
await wait_for_state(f1.key, "flight", w2)
await wait_for_state(f2.key, "flight", w2)
  1. get rid of the BlockGetDataWorker subclass
  2. initialise the worker with gen_cluster
  3. use
    event = asyncio.Event()
    w1.rpc = _LockedCommPool(w1.rpc, write_event=event)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like my version with the worker subclass better. The _LockedCommPool requires much more low level knowledge and is more brittle in my opinion. I think it should only be used if nothing else is possible.

@crusaderky
Copy link
Collaborator

I understand the use case now. I really don't think it's healthy, but I understand it.
The following is a design discussion that is off-topic to the PR:

Below is the full story (I renamed the simulus IDs for the sake of readability; broken down in 1 paragraph per stimulus ID)

- ('f4', 'compute-task', 'released', 'compute-task-A', 1652904634.4127107)
- ('f3', 'ensure-task-exists', 'released', 'compute-task-A', 1652904634.4127202)
- ('f4', 'released', 'waiting', 'waiting', {'f3': 'fetch'}, 'compute-task-A', 1652904634.4127345)
- ('f3', 'released', 'fetch', 'fetch', {}, 'compute-task-A', 1652904634.4127429)
- ('gather-dependencies', 'tcp://127.0.0.1:36963', {'f3'}, 'compute-task-A', 1652904634.412779)
- ('f3', 'fetch', 'flight', 'flight', {}, 'compute-task-A', 1652904634.4127882)
- ('request-dep', 'tcp://127.0.0.1:36963', {'f3'}, 'compute-task-A', 1652904634.412878)

- ('free-keys', ('f4',), 'stim-id', 1652904634.417687)
- ('f4', 'release-key', 'stim-id', 1652904634.417695)
- ('f4', 'waiting', 'released', 'released', {'f3': 'released', 'f4': 'forgotten'}, 'stim-id', 1652904634.4177065)
- ('f4', 'released', 'forgotten', 'forgotten', {}, 'stim-id', 1652904634.4177113)
- ('f3', 'flight', 'released', 'cancelled', {}, 'stim-id', 1652904634.4177146)

- ('f3', 'compute-task', 'cancelled', 'compute-task-B', 1652904634.4229603)
- ('f2', 'ensure-task-exists', 'released', 'compute-task-B', 1652904634.4229703)
- ('f1', 'ensure-task-exists', 'released', 'compute-task-B', 1652904634.4229808)
- ('f3', 'cancelled', 'waiting', 'resumed', {}, 'compute-task-B', 1652904634.4229932)

- ('receive-dep-failed', 'tcp://127.0.0.1:36963', {'f3'}, 'compute-task-A', 1652904635.433945)
- ('missing-who-has', 'tcp://127.0.0.1:36963', 'f3', 'compute-task-A', 1652904635.433949)
- ('f3', 'release-key', 'compute-task-A', 1652904635.4339857)
- ('f3', 'resumed', 'missing', 'released', {'f2': 'released', 'f1': 'released', 'f3': 'waiting'}, 'compute-task-A', 1652904635.434019)
- ('f3', 'released', 'waiting', 'waiting', {'f2': 'fetch', 'f1': 'fetch'}, 'compute-task-A', 1652904635.4340317)
- ('f1', 'released', 'fetch', 'fetch', {}, 'compute-task-A', 1652904635.4340475)
- ('f2', 'released', 'fetch', 'fetch', {}, 'compute-task-A', 1652904635.4340563)
- ('gather-dependencies', 'tcp://127.0.0.1:40709', {'f2', 'f1'}, 'compute-task-A', 1652904635.434118)
- ('f1', 'fetch', 'flight', 'flight', {}, 'compute-task-A', 1652904635.4341378)
- ('f2', 'fetch', 'flight', 'flight', {}, 'compute-task-A', 1652904635.4341433)
- ('request-dep', 'tcp://127.0.0.1:40709', {'f2', 'f1'}, 'compute-task-A', 1652904635.4342315)
- ('receive-dep', 'tcp://127.0.0.1:40709', {'f2', 'f1'}, 'compute-task-A', 1652904635.4368486)
- ('f1', 'put-in-memory', 'compute-task-A', 1652904635.4369001)
- ('f1', 'flight', 'memory', 'memory', {}, 'compute-task-A', 1652904635.43691)
- ('f2', 'put-in-memory', 'compute-task-A', 1652904635.4369361)
- ('f2', 'flight', 'memory', 'memory', {'f3': 'ready'}, 'compute-task-A', 1652904635.4369402)
- ('f3', 'waiting', 'ready', 'ready', {'f3': 'executing'}, 'compute-task-A', 1652904635.4369576)
- ('f3', 'ready', 'executing', 'executing', {}, 'compute-task-A', 1652904635.436971)

- ('f3', 'put-in-memory', 'task-finished-1652904635.4378343', 1652904635.4379992)
- ('f3', 'executing', 'memory', 'memory', {}, 'task-finished-1652904635.4378343', 1652904635.4380224)

- ('f4', 'compute-task', 'released', 'compute-task-C', 1652904635.440761)
- ('f3', 'ensure-task-exists', 'memory', 'compute-task-C', 1652904635.4407744)
- ('f4', 'released', 'waiting', 'waiting', {'f4': 'ready'}, 'compute-task-C', 1652904635.4407973)
- ('f4', 'waiting', 'ready', 'ready', {'f4': 'executing'}, 'compute-task-C', 1652904635.4408145)
- ('f4', 'ready', 'executing', 'executing', {}, 'compute-task-C', 1652904635.4408255)

- ('f4', 'put-in-memory', 'task-finished-1652904635.4411478', 1652904635.4412405)
- ('f4', 'executing', 'memory', 'memory', {}, 'task-finished-1652904635.4411478', 1652904635.4412603)

What I understand is happening in a real life scenario:

  1. f3 goes in flight w1->w2
  2. w1 is slow to send back any data. So slow, in fact, that after a while the scheduler decides it's dead. This happens for unrelated reasons to the send itself - but if e.g. the event loop of w1 is K.O., it means that both the transfer to w2 and the heartbeat to the scheduler will go silent at the same time.
  3. The scheduler tells w2 to stop waiting for f3 from w1 and instead compute it itself.
  4. This is where things go wrong. Instead of immediately fetching f1 and f2 from w3, w2 sits idly in resumed status. This will last until the tcp channel to w1 goes into timeout (in the test, there is a clean disconnect initiated by w1.close()).
  5. after the timeout, f1 and f2 are finally fetched from w3. This whole new transition loop reuses the stimulus_id of the initial compute - which made sense for the initial gather_dep; much less so here.

What should happen, in my opinion:

  1. (idem)
  2. (idem)
  3. (idem)
  4. w2 immediately fetches f1 and f2 from w3. When that is done, it computes f3. Ideally, it should cancel the gather_dep(w3, [f3]) asyncio task. Alternatively, it should deal with a race condition where f3 may arrive from w2 in the meantime.

@fjetter fjetter marked this pull request as ready for review May 19, 2022 07:37
@fjetter
Copy link
Member Author

fjetter commented May 19, 2022

w2 immediately fetches f1 and f2 from w3.

I do not think it should fetch the tasks right away and I honestly don't think it would reduce complexity.

When that is done, it computes f3. Ideally, it should cancel the gather_dep(w3, [f3]) asyncio task.

This has been brought up very frequently. As it stands right now, we cannot cancel a gather_dep request for a single key. As long as this is not possible we cannot get rid of the cancelled/resumed mechanism.
Similarly, we cannot schedule an execute while a fetch is running. This has been the primary source of deadlocks and inconsistencies last year before we started formalizing the state machine and I'm not willing to go back to this state.
If you want to discuss this further, I encourage opening another ticket.

@fjetter fjetter force-pushed the remove_wrong_assert_in_handle_compute branch from 2a24f3e to 3e9134b Compare May 19, 2022 09:48
@fjetter fjetter self-assigned this May 19, 2022
@fjetter fjetter merged commit 6b91ec6 into dask:main May 20, 2022
@fjetter fjetter deleted the remove_wrong_assert_in_handle_compute branch May 20, 2022 08:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants