Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize data within tasks #2110

Open
mrocklin opened this issue Jul 12, 2018 · 6 comments
Open

Serialize data within tasks #2110

mrocklin opened this issue Jul 12, 2018 · 6 comments

Comments

@mrocklin
Copy link
Member

Currently if we submit data within a task like the following:

x = np.array([1, 2, 3])
future = client.submit(func, x)

Then we construct a task like (func, x) and then call pickle on this task. We don't do custom serialization once we contruct tasks. This is mostly because stuff in tasks is rarely large, and traversing tasks can increase our overhead in the common case. Typically we encourage people to use dask.delayed or something similar to mark data

x = np.array([1, 2, 3])
x = dask.delayed(x)
future = client.submit(func, x)

But this is error prone and requires rarely held expertise.

We might again consider traversing arguments.

@dhirschfeld
Copy link
Contributor

Just a note to mention that none of the below work:

fut = client.submit(echo, obj)
obj_fut = client.scatter(obj)
fut = client.submit(echo, obj_fut)
delayed_obj = delayed(obj)
fut = client.submit(echo, delayed_obj)

All three methods of submitting a function to run on the cluster fail, going through the exact same warn_dumps ⟶ pickle.dumps code path.

Failing test cases for each are currently part of PR #2115

@dhirschfeld
Copy link
Contributor

Update:
Scattering does work, but only if you ensure the custom serialization is imported on all workers.

obj_fut = client.scatter(obj)

...will fail with the below error if distributed.protocol.arrow isn't explicitly imported.

  File "C:\dev\src\distributed\distributed\protocol\serialize.py", line 157, in serialize
    raise TypeError(msg)
TypeError: Could not serialize object of type RecordBatch

If you import distributed.protocol.arrow in the client process but not in the workers it fails with the below error:

  File "c:\dev\src\distributed\distributed\core.py", line 448, in send_recv
    raise Exception(response['text'])
Exception: Serialization for type pyarrow.lib.RecordBatch not found

...so to get it to actually work I need to run:

def init_arrow():
    from distributed.protocol import arrow
    return None

init_arrow()
client.run(init_arrow)
obj_fut = client.scatter(obj)
fut = client.submit(echo, obj_fut)
result = fut.result()
assert obj.equals(result)

@dhirschfeld
Copy link
Contributor

The question I have is Is this by design? i.e. is it intended that the user has to initialise the serialisation support on all of the workers?

In the case of an adaptive cluster I guess this could be supported by using the --preload option for any new workers.

@mrocklin
Copy link
Member Author

Yes, something like preload is probably the right way to handle this today, assuming that it's not already in library code.

Eventually it would be nice to allow clients to register functions to be run at worker start time with the scheduler that could be passed to workers as they start up.

  1. Client registers func with the scheduler as a preload operation
  2. Scheduler holds on to func
  3. Scheduler also sends func to all workers to have them run func
  4. Workers run func
  5. New worker arrives, tells Scheduler that it exists
  6. As part of saying "Hi" the scheduler also gives the worker func and tells it to run it

If anyone is interested in implementing this let me know and I'll point to the various locations in the client, scheduler, and worker, where these changes would have to be made. It's a modest amount of work and would be a good introduction to the distributed system.

@dhirschfeld
Copy link
Contributor

I moved the above to a new issue as I think it's a separate concern.

@jakirkham
Copy link
Member

The idea of handling external serialization more simply is brought up in issue ( #3831 ) as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants