Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-daemonic workers #2718

Closed
calebho opened this issue May 21, 2019 · 12 comments · Fixed by #2739
Closed

Non-daemonic workers #2718

calebho opened this issue May 21, 2019 · 12 comments · Fixed by #2739

Comments

@calebho
Copy link
Contributor

calebho commented May 21, 2019

Related to #2142, but the solution doesn't apply in my case. I have a use case for workers running in separate processes, but as non-daemons because the worker processes need to use multiprocessing. Here's an example:

import torch
import torch.distributed as dist
import torchvision
import os
from distributed import Client, LocalCluster


def worker_fn(rank, world_size):
    print('worker', rank)

    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '8989'
    dist.init_process_group(
        backend=dist.Backend.NCCL,
        rank=rank,
        world_size=world_size,
    )
    print('initialized distributed', rank)

    if rank == 0:
        dataset = torchvision.datasets.MNIST(
            '../data/',
            train=True,
            download=True,
        )
    dist.barrier()
    if rank != 0:
        dataset = torchvision.datasets.MNIST(
            '../data/',
            train=True,
            download=False,
        )
    # load data, uses multiprocessing
    loader = torch.utils.data.DataLoader(
        dataset,
        sampler=torch.utils.data.distributed.DistributedSampler(
            dataset,
            rank=rank,
            num_replicas=world_size,
        ),
        num_workers=2,
    )
    print('constructed data loader', rank)

    # if cuda is available, initializes it as well
    assert torch.cuda.is_available()
    # do distributed training, but in this case it suffices to iterate
    for x, y in loader:
        pass


def main():
    world_size = 2
    cluster = LocalCluster(
        n_workers=world_size,
        processes=True,
        resources={
            'GPUS': 1,  # don't allow two tasks to run on the same worker
        },
    )
    cl = Client(cluster)
    futs = []
    for rank in range(world_size):
        futs.append(
            cl.submit(
                worker_fn,
                rank,
                world_size,
                resources={'GPUS': 1},
            ))

    for f in futs:
        f.result()


if __name__ == '__main__':
    main()

If processes=True, then we get an error about daemonic processes not being allowed to have children:

worker 0
worker 1
initialized distributed 1
initialized distributed 0
constructed data loader 0
constructed data loader 1
distributed.worker - WARNING -  Compute Failed
Function:  worker_fn
args:      (0, 2)
kwargs:    {}
Exception: AssertionError('daemonic processes are not allowed to have children',)

Traceback (most recent call last):
  File "scratch.py", line 152, in <module>
    main()
  File "scratch.py", line 148, in main
    f.result()
  File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/distributed/client.py", line 227, in result
    six.reraise(*result)
  File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "scratch.py", line 123, in worker_fn
    for x, y in loader:
  File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 193, in __iter__
    return _DataLoaderIter(self)
  File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 469, in __init__
    w.start()
  File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
distributed.worker - WARNING -  Compute Failed
Function:  worker_fn
args:      (1, 2)
kwargs:    {}
Exception: AssertionError('daemonic processes are not allowed to have children',)

If processes=False, we get stuck at distributed initialization.

@mrocklin
Copy link
Member

You might try setting up dask-worker processes yourself, perhaps with the --no-nanny flag. If you come up with another easier way to handle this it would be great if you could share here.

@calebho
Copy link
Contributor Author

calebho commented May 21, 2019

The first solution that comes to mind is to not run workers as daemons, but I suspect there's a reason why they are. So why are workers run as daemons?

@calebho
Copy link
Contributor Author

calebho commented May 21, 2019

In particular, is there any reason why this isn't a parameter to the WorkerProcess constructor?

@mrocklin
Copy link
Member

I have no problem with making that configurable. I might suggest a config option rather than piping it through all of the constructors.

https://docs.dask.org/en/latest/configuration.html

Any interest in submitting a Pull Request with that change?

@calebho
Copy link
Contributor Author

calebho commented May 21, 2019

Yeah I can work on a PR

@mrocklin
Copy link
Member

Checking in here @calebho . Is there anything I can do to help?

@calebho
Copy link
Contributor Author

calebho commented May 28, 2019

@mrocklin Thanks for checking in. I started working on this last week, but I had to put it on hold because other things came up. I should have something by the end of the week or so

@mrocklin
Copy link
Member

mrocklin commented May 28, 2019 via email

@zhanghang1989
Copy link

I am new to dask. Is that possible to set --no-nanny when using dask-ssh?

@mrocklin
Copy link
Member

@zhanghang1989 I recommend raising a new issue. I recommend not repeating your comment on multiple issues.

@calebho
Copy link
Contributor Author

calebho commented May 30, 2019

Hey @mrocklin question about dask.config. I'm writing a test to see whether setting the config value actually does anything:

@gen_cluster(ncores=[("127.0.0.1", 1)], client=True, Worker=Nanny)
def test_worker_no_daemon(c, s, a):
    def noop():
        pass

    def multiprocessing_worker():
        p = mp.Process(target=noop)
        p.start()
        p.join()

    with dask.config.set({"distributed.worker.daemon": False}):
        from pprint import pprint
        print('config in test')
        pprint(dask.config.config)
        yield c.submit(multiprocessing_worker)

The config value seems to be set in the test, but not in the WorkerProcess.start where I read from the same config value to set self.process.daemon. The test fails with "daemonic processes are not allowed to have children". Sure enough when I print the config inside of start, I don't see the config value at all and so it's falling back to the default value which I set to bet True. Am I using dask.config.set properly? I guess a workaround could be to create a temporary file in ~/.config/dask...

@mrocklin
Copy link
Member

You are calling dask.config.set properly. However at that stage the workers/nannies would already have been created, so they would be daemon processes regardless. Presumably in testing you would want to use the config= keyword to the gen_cluster decorator (assuming that you've already made changes to respect the distributed.worker.daemon config value.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants