Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"can't pickle thread.lock objects" when calling array.store with distributed #780

Open
rabernat opened this issue Jan 3, 2017 · 20 comments

Comments

@rabernat
Copy link

rabernat commented Jan 3, 2017

I am trying to store a dask array using distributed. When I call store, I get an error "can't pickle thread.lock objects".

I originally was trying this in a much more complex context involving netCDF, xarray, etc. But I managed to come up with the following minimal example.

import numpy as np
import dask.array as da
from distributed import Client

def create_and_store_dask_array():
    shape = (10000, 1000)
    chunks = (1000, 1000)
    data = da.zeros(shape, chunks=chunks)
    store = np.memmap('test.memmap', mode='w+', dtype=data.dtype, shape=data.shape)
    data.store(store)
    print("Success!")

create_and_store_dask_array()
client = Client()
create_and_store_dask_array()

The first call works, but the second fails. The output is:

Success!
/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/pickle.pyc - INFO - Failed to serialize (<function store at 0x7f0ee802f488>, (<functools.partial object at 0x7f0ec84f1418>, (1000, 1000)), (slice(2000, 3000, None), slice(0, 1000, None)), <thread.lock object at 0x7f0f2c715af0>)
Traceback (most recent call last):
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/pickle.py", line 43, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 706, in dumps
    cp.dump(obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 146, in dump
    return Pickler.dump(self, obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/core.pyc - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/core.py", line 43, in dumps
    for key, value in data.items()
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/core.py", line 44, in <dictcomp>
    if type(value) is Serialize}
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/serialize.py", line 106, in serialize
    header, frames = {}, [pickle.dumps(x)]
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/distributed/protocol/pickle.py", line 43, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 706, in dumps
    cp.dump(obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 146, in dump
    return Pickler.dump(self, obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/home/rpa/.conda/envs/lagrangian_vorticity/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects

Versions:

import dask
print dask.__version__
import distributed
print distributed.__version__
>>> 0.12.0
>>> 1.14.3
@rabernat
Copy link
Author

rabernat commented Jan 3, 2017

Seems related to dask/dask#1683.

@mrocklin
Copy link
Member

mrocklin commented Jan 3, 2017

Can you try the following

# data.store(store)

from dask.utils import SerializableLock
lock = SerializableLock()
data.store(store, lock=lock)

@rabernat
Copy link
Author

rabernat commented Jan 3, 2017

It works with SerializableLock. Thanks @mrocklin!

Is there a way to make this the default?

@rabernat
Copy link
Author

rabernat commented Jan 3, 2017

p.s. I had to update to dask. 0.13.0 in order to be able to import the SerializableLock.

@mrocklin
Copy link
Member

mrocklin commented Jan 3, 2017

We could make it the default, yes. I suppose it doesn't have much overhead over standard locks.

@mrocklin
Copy link
Member

mrocklin commented Jan 3, 2017

This just came in. You could ping there and see if he's interested in implementing this as well: dask/knit#60

@rabernat
Copy link
Author

rabernat commented Jan 3, 2017

So with SerializableLock it works at the dask level. But now I would like to wrap this array in an xarray Dataset and call to_netcdf. Unfortunately that function does not allow me to pass a custom lock. It appears that calls to dask.array.store automatically get passed a threading.Lock() object:
https://github.com/pydata/xarray/blob/master/xarray/backends/common.py#L153

Should I raise this issue over at xarray then?

(I must admit I don't see the relevance of dask/knit#60 to this issue.)

@mrocklin
Copy link
Member

mrocklin commented Jan 3, 2017

Sorry, wrong copy-paste: dask/dask#1879

@mrocklin
Copy link
Member

mrocklin commented Jan 3, 2017

Or you could also implement this yourself. Should be an easy change and it'd be good to have your name in the authors list :) (or I can do it, but that's boring)

@mrocklin
Copy link
Member

mrocklin commented Jan 3, 2017

We built SerializableLock with XArray in mind. I think that they started using it as well in some cases.

cc @shoyer

@rabernat
Copy link
Author

rabernat commented Jan 3, 2017

Ah, it looks like there is already an xarray PR in progress that would address this: pydata/xarray#1179

@pelson
Copy link
Contributor

pelson commented Jan 16, 2017

Just ran into this too. Is it feasible to extend cloudpickle to dispatch thread.lock, rather than retro-fitting SerializableLock usage?

@mrocklin
Copy link
Member

I think that cloudpickle should probably still err when trying to serialize a lock. This is usually an unsafe thing to do I think.

@mrocklin
Copy link
Member

@pelson would the solution in either of the two PRs suffice for you? I'm happy to spend some time to make this workable.

@pelson
Copy link
Contributor

pelson commented Jan 16, 2017

As it happens, I'm not sure it was exactly the same issue (as in, line of code within distributed).
I added a print statement into cloudpickle (around L604) at along the lines of:

        if state is not None:
            print('STATE:', state)
            save(state)
            write(pickle.BUILD)

And was able to determine it was a completely different thread lock I had hold of...

STATE: {'name': 'mds-logger', 'parent': <logging.RootLogger object at 0x7f4e15e50310>, 'handlers': [], 'level': 0, 'disabled': 0, 'manager': <logging.Manager object at 0x7f4e15e50190>, 'propagate': 1, 'filters': []}
STATE: {'name': 'root', 'parent': None, 'handlers': [<logging.StreamHandler object at 0x7f4defcbd090>], 'level': 20, 'disabled': 0, 'propagate': 1, 'filters': []}
STATE: {'stream': <open file '<stderr>', mode 'w' at 0x7f4e15fa91e0>, 'level': 0, 'lock': <_RLock owner=None count=0>, '_name': None, 'filters': [], 'formatter': <logging.Formatter object at 0x7f4defcbd1d0>}
STATE: {'_Verbose__verbose': False, '_RLock__owner': None, '_RLock__block': <thread.lock object at 0x7f4df056e8b0>, '_RLock__count': 0}

I got rid of my global logger and the issue disappeared. This is really a cloudpickle issue, not distributed.
For completeness, I have now tracked this down to the following:

>>> from cloudpickle import dumps
>>> import sys

>>> dumps(sys.stdout)
... fine

>>> dumps(sys.stderr)
... fine

>>> import logging
>>> logger = logging.getLogger('test')
>>> dumps(logger)
... fine

>>> logger.info('test')
>>> dumps(logger)
... fine

>>> logger.addHandler(logging.StreamHandler())
>>> dumps(logger)
...
TypeError: can't pickle thread.lock objects

💣

Sorry for the noise on this issue. Happy to move this over to cloudpickle if it is helpful?

@jakirkham
Copy link
Member

Also saw similar issues. FWIW dill seems to have no problem pickling locks from threading, but cloudpickle does ( cloudpipe/cloudpickle#81 ). Though I'm unclear on whether one should ever be serializing locks from threading instead of using a lock designed to be shared between processes for instance.

@mrocklin
Copy link
Member

@rabernat is your original issue resolved?

@rabernat
Copy link
Author

I believe it is not resolved because dask/dask#1881 has not been merged yet. It is awaiting a test which I have not had time to figure out how to write.

@returncode13
Copy link

hi. @mrocklin is there an update on this issue?
Are we still to use the SerializableLock?

@mrocklin
Copy link
Member

mrocklin commented Apr 25, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants