Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler transition error from erred to memory #6283

Closed
gjoseph92 opened this issue May 5, 2022 · 1 comment
Closed

Scheduler transition error from erred to memory #6283

gjoseph92 opened this issue May 5, 2022 · 1 comment
Labels
deadlock The cluster appears to not make any progress

Comments

@gjoseph92
Copy link
Collaborator

Found while trying to hack together a script for users to work around #6228. The goal was to look for workers that seemed like they might be stuck, and force them to shut down.

This is probably not very important. Just writing it down. This is not minimized at all and we shouldn't look into it much until #6272 is fixed at least, because I'm not sure how much that's related, and I think that changes some of the worker reconnect logic on the scheduler that might be problematic here too.

The Error transitioning 'break_worker-d005ab4b7e4707de0ddc4d926ecb510f' from 'erred' to 'memory' is around the fact that the transition is receiving kwargs when it thinks it shouldn't. My guess is this is something odd related to erroring suspicious tasks after 3 worker failures, then a worker rejoining somehow that actually has that task in memory?

When you run this script, it'll cycle through closing the workers a couple times, then wait until you press enter to shut down (it's not deadlocked).

import time

import distributed


def break_worker():
    worker = distributed.get_worker()
    print(f"breaking {worker}")
    worker.batched_stream.comm.abort()


async def close_worker(dask_worker):
    print(f"Shutting down {dask_worker}")
    try:
        await dask_worker.close(report=True, nanny=False)
    except Exception:
        import sys
        import traceback

        print("Failed to close worker cleanly, exiting")
        traceback.print_exc()
        sys.exit(0)


EXPECTED_TOTAL_RUNTIME = 5

if __name__ == "__main__":
    with distributed.Client(
        n_workers=2, processes=True, threads_per_worker=2
    ) as client:
        print(client.dashboard_link)
        input("Press enter to start")

        fs = client.map(time.sleep, [2] * 10, pure=False)
        b = client.submit(break_worker)

        while True:
            try:
                distributed.wait(fs, timeout=EXPECTED_TOTAL_RUNTIME)
            except distributed.TimeoutError:
                stuck_keys = {f.key for f in fs if not f.done()}
            else:
                break

            processing = client.processing()
            stuck_workers = {
                addr
                for addr, tasks in processing.items()
                if stuck_keys.intersection(tasks)
            }
            print(f"{stuck_keys} are stuck on {stuck_workers}")
            client.run(
                lambda dask_worker: dask_worker.loop.add_callback(
                    close_worker, dask_worker
                ),
                workers=list(stuck_workers),
            )

        input("Done")
(env) » python blocker.py
http://127.0.0.1:8787/status
Press enter to start
breaking <Worker 'tcp://127.0.0.1:51295', name: 1, status: running, stored: 2, running: 2/2, ready: 2, comm: 0, waiting: 0>
{'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-3', 'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-1', 'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-4', 'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-0', 'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-9', 'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-2'} are stuck on {'tcp://127.0.0.1:51296', 'tcp://127.0.0.1:51295'}
Shutting down <Worker 'tcp://127.0.0.1:51295', name: 1, status: running, stored: 4, running: 2/2, ready: 0, comm: 0, waiting: 0>
Shutting down <Worker 'tcp://127.0.0.1:51296', name: 0, status: running, stored: 4, running: 2/2, ready: 4, comm: 0, waiting: 0>
2022-05-05 11:51:28,082 - distributed.nanny - ERROR - Worker process died unexpectedly
2022-05-05 11:51:28,083 - distributed.nanny - ERROR - Worker process died unexpectedly
Exception in thread Nanny stop queue watch:
Traceback (most recent call last):
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/threading.py", line 954, in _bootstrap_inner
    self.run()
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/threading.py", line 892, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/gabe/dev/distributed/distributed/nanny.py", line 860, in watch_stop_q
    child_stop_q.close()
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/multiprocessing/queues.py", line 143, in close
    self._reader.close()
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/multiprocessing/connection.py", line 182, in close
    self._close()
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/multiprocessing/connection.py", line 366, in _close
    _close(self._handle)
OSError: [Errno 9] Bad file descriptor
2022-05-05 11:51:28,212 - distributed.nanny - WARNING - Restarting worker
2022-05-05 11:51:28,213 - distributed.nanny - WARNING - Restarting worker
breaking <Worker 'tcp://127.0.0.1:51314', name: 0, status: running, stored: 0, running: 2/2, ready: 9, comm: 0, waiting: 0>
breaking <Worker 'tcp://127.0.0.1:51317', name: 1, status: running, stored: 0, running: 2/2, ready: 9, comm: 0, waiting: 0>
{'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-1', 'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-9', 'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-8', 'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-5', 'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-7', 'sleep-b93ddb76-baf7-4fca-957c-ee7ffa8f4246-6'} are stuck on {'tcp://127.0.0.1:51314', 'tcp://127.0.0.1:51317'}
Shutting down <Worker 'tcp://127.0.0.1:51317', name: 1, status: running, stored: 2, running: 2/2, ready: 7, comm: 0, waiting: 0>
Shutting down <Worker 'tcp://127.0.0.1:51314', name: 0, status: running, stored: 2, running: 2/2, ready: 7, comm: 0, waiting: 0>
2022-05-05 11:51:32,939 - distributed.nanny - ERROR - Worker process died unexpectedly
2022-05-05 11:51:32,939 - distributed.nanny - ERROR - Worker process died unexpectedly
Exception in thread Nanny stop queue watch:
Traceback (most recent call last):
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/threading.py", line 954, in _bootstrap_inner
    self.run()
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/threading.py", line 892, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/gabe/dev/distributed/distributed/nanny.py", line 860, in watch_stop_q
    child_stop_q.close()
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/multiprocessing/queues.py", line 143, in close
    self._reader.close()
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/multiprocessing/connection.py", line 182, in close
    self._close()
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/multiprocessing/connection.py", line 366, in _close
    _close(self._handle)
OSError: [Errno 9] Bad file descriptor
2022-05-05 11:51:33,062 - distributed.nanny - WARNING - Restarting worker
2022-05-05 11:51:33,069 - distributed.nanny - WARNING - Restarting worker
breaking <Worker 'tcp://127.0.0.1:51328', name: 0, status: running, stored: 0, running: 2/2, ready: 4, comm: 0, waiting: 0>
Done2022-05-05 11:51:34,305 - distributed.scheduler - ERROR - Error transitioning 'break_worker-d005ab4b7e4707de0ddc4d926ecb510f' from 'erred' to 'memory'
Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/scheduler.py", line 1573, in _transition
    assert not args and not kwargs, (args, kwargs, start_finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:51328', 'nbytes': 16, 'typename': 'NoneType'}, ('erred', 'memory'))
((), {'worker': 'tcp://127.0.0.1:51328', 'nbytes': 16, 'typename': 'NoneType'}, ('erred', 'memory'))
Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 759, in wrapper
    return await func(*args, **kwargs)
  File "/Users/gabe/dev/distributed/distributed/scheduler.py", line 3762, in add_worker
    t: tuple = self._transition(
  File "/Users/gabe/dev/distributed/distributed/scheduler.py", line 1573, in _transition
    assert not args and not kwargs, (args, kwargs, start_finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:51328', 'nbytes': 16, 'typename': 'NoneType'}, ('erred', 'memory'))
2022-05-05 11:51:34,306 - distributed.core - ERROR - Exception while handling op register-worker
Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/core.py", line 559, in handle_comm
    result = await result
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 759, in wrapper
    return await func(*args, **kwargs)
  File "/Users/gabe/dev/distributed/distributed/scheduler.py", line 3762, in add_worker
    t: tuple = self._transition(
  File "/Users/gabe/dev/distributed/distributed/scheduler.py", line 1573, in _transition
    assert not args and not kwargs, (args, kwargs, start_finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:51328', 'nbytes': 16, 'typename': 'NoneType'}, ('erred', 'memory'))
@fjetter fjetter added the deadlock The cluster appears to not make any progress label May 16, 2022
@fjetter
Copy link
Member

fjetter commented Jun 3, 2022

Apart from the workers not coming up any more (#6387) this example now works as expected (I restricted the breakage to one worker and tweaked timeouts to get it to work)

@fjetter fjetter closed this as completed Jun 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deadlock The cluster appears to not make any progress
Projects
None yet
Development

No branches or pull requests

2 participants