Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"distributed.protocol.core - CRITICAL - Failed to Serialize" with PyArrow 0.13 #2597

Closed
johnbensnyder opened this issue Apr 6, 2019 · 56 comments
Labels
bug Something is broken help wanted

Comments

@johnbensnyder
Copy link

Reading from Parquet is failing with PyArrow 0.13. Downgrading to PyArrow 0.12.1 seems to fix the problem. I've only encountered this when using the distributed client. Using a Dask dataframe by itself does not appear to be affected.

For example,

from distributed import LocalCluster, Client
import dask.dataframe as dd
client = Client(LocalCluster(diagnostics_port=('0.0.0.0', 8889), n_workers = 4))
ddf = dd.read_parquet('DATA/parquet/', engine = 'pyarrow')
ddf.set_index('index')

Gives

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
    for key, value in data.items()
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
    if type(value) is Serialize}
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 163, in serialize
    raise TypeError(msg, str(x)[:10000])
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/batched.py", line 94, in _background_send
    on_error='raise')
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 224, in write
    'recipient': self._peer_addr})
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 50, in to_frames
    res = yield offload(_to_frames)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/opt/conda/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 43, in _to_frames
    context=context))
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
    for key, value in data.items()
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
    if type(value) is Serialize}
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 163, in serialize
    raise TypeError(msg, str(x)[:10000])

Similarly,

from distributed import LocalCluster, Client
import dask.dataframe as dd
client = Client(LocalCluster(diagnostics_port=('0.0.0.0', 8889), n_workers = 4))
ddf = dd.read_parquet('DATA/parquet/', engine = 'pyarrow')
ddf.compute()

Causes this error

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
     37     try:
---> 38         result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     39         if len(result) < 1000:

AttributeError: Can't pickle local object 'ParquetDataset._get_open_file_func.<locals>.open_file'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-3-a90291ebde1e> in <module>
      1 ft_inp_ddf = dd.read_parquet('DATA/ft_14_15_inputs_parquet/', engine = 'pyarrow')
----> 2 ft_inp_ddf.compute()

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

/opt/conda/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2318             retries=retries,
   2319             user_priority=priority,
-> 2320             actors=actors,
   2321         )
   2322         packed = pack_data(keys, futures)

/opt/conda/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2259 
   2260             self._send_to_scheduler({'op': 'update-graph',
-> 2261                                      'tasks': valmap(dumps_task, dsk3),
   2262                                      'dependencies': dependencies,
   2263                                      'keys': list(flatkeys),

/opt/conda/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/opt/conda/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/opt/conda/lib/python3.6/site-packages/distributed/worker.py in dumps_task(task)
   2769         elif not any(map(_maybe_complex, task[1:])):
   2770             return {'function': dumps_function(task[0]),
-> 2771                     'args': warn_dumps(task[1:])}
   2772     return to_serialize(task)
   2773 

/opt/conda/lib/python3.6/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
   2778 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
   2779     """ Dump an object to bytes, warn if those bytes are large """
-> 2780     b = dumps(obj)
   2781     if not _warn_dumps_warned[0] and len(b) > limit:
   2782         _warn_dumps_warned[0] = True

/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
     49     except Exception:
     50         try:
---> 51             return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     52         except Exception as e:
     53             logger.info("Failed to serialize %s. Exception: %s", x, e)

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
    959     try:
    960         cp = CloudPickler(file, protocol=protocol)
--> 961         cp.dump(obj)
    962         return file.getvalue()
    963     finally:

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
    265         self.inject_addons()
    266         try:
--> 267             return Pickler.dump(self, obj)
    268         except RuntimeError as e:
    269             if 'recursion' in e.args[0]:

/opt/conda/lib/python3.6/pickle.py in dump(self, obj)
    407         if self.proto >= 4:
    408             self.framer.start_framing()
--> 409         self.save(obj)
    410         self.write(STOP)
    411         self.framer.end_framing()

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_tuple(self, obj)
    749         write(MARK)
    750         for element in obj:
--> 751             save(element)
    752 
    753         if id(obj) in memo:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

/opt/conda/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    632 
    633         if state is not None:
--> 634             save(state)
    635             write(BUILD)
    636 

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
    398             # func is nested
    399             if lookedup_by_name is None or lookedup_by_name is not obj:
--> 400                 self.save_function_tuple(obj)
    401                 return
    402 

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
    592         if hasattr(func, '__qualname__'):
    593             state['qualname'] = func.__qualname__
--> 594         save(state)
    595         write(pickle.TUPLE)
    596         write(pickle.REDUCE)  # applies _fill_function on the tuple

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_list(self, obj)
    779 
    780         self.memoize(obj)
--> 781         self._batch_appends(obj)
    782 
    783     dispatch[list] = save_list

/opt/conda/lib/python3.6/pickle.py in _batch_appends(self, items)
    806                 write(APPENDS)
    807             elif n:
--> 808                 save(tmp[0])
    809                 write(APPEND)
    810             # else tmp is empty, and we're done

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

/opt/conda/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    632 
    633         if state is not None:
--> 634             save(state)
    635             write(BUILD)
    636 

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    494             reduce = getattr(obj, "__reduce_ex__", None)
    495             if reduce is not None:
--> 496                 rv = reduce(self.proto)
    497             else:
    498                 reduce = getattr(obj, "__reduce__", None)

/opt/conda/lib/python3.6/site-packages/pyarrow/_parquet.cpython-36m-x86_64-linux-gnu.so in pyarrow._parquet.ParquetSchema.__reduce_cython__()

TypeError: no default __reduce__ due to non-trivial __cinit__
@martindurant martindurant added the bug Something is broken label Apr 6, 2019
@martindurant
Copy link
Member

It appears that, in the absence of a serialization mechanism directly in pyarrow, we'll have to write a dask specific one for these schema objects, at least for now. I can have a go early this coming week, if no one beats me to it. In the mean time @johnbensnyder , you can try to downgrade to pyarrow 0.12, or use fastparquet.

@muammar
Copy link
Contributor

muammar commented Apr 6, 2019

Sorry to add noise in this issue, but is this also similar to this -> #2581 (comment)?. If yes, I would like to try to solve it for pytorch, too.

@martindurant
Copy link
Member

@muammar , that issue is in deed the same type of thing happening, but not related directly to this one, as it's a completely different sort of object which is causing the problem.

@martindurant
Copy link
Member

@johnbensnyder , I have not managed to reproduce this locally; could you specify which versions you are using and how you generated the data, please?

@muammar
Copy link
Contributor

muammar commented Apr 8, 2019

@muammar , that issue is in deed the same type of thing happening, but not related directly to this one, as it's a completely different sort of object which is causing the problem.

It is happening for me when I use the latest git version and install distributed with python3 setup.py install. If I install the stable distributed version it works.

@martindurant
Copy link
Member

OK, thanks, updating to current master did indeed reveal the situation you are seeing.

At first glance, I do not know how to fix it. The new pyarrow ParquetDataset includes an opaque parquet schema instance (a C++-managed object) as an attribute, which has no de/serialise mechanism on the python side; apparently an arrow schema instance is not enough here anymore (I assume conversion to/from thrift is implemented within c++).
It also holds onto references to open files in _open_file_func, _get_open_file_func and the pieces' open_file_func, which will not serialise if it is a local file.

I am including this as information for someone who might know how to fix these problems, perhaps on the (py)arrow side.

@mrocklin
Copy link
Member

mrocklin commented Apr 8, 2019

My guess is that if you want to engage the pyarrow devs that you should raise a JIRA on their issue tracker. I recommend trying to reproduce the issue using only cloudpickle and pyarrow and then raising there.

@martindurant
Copy link
Member

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2019

Thanks for reporting upstream @martindurant !

@mrocklin
Copy link
Member

OK, so we haven't gotten any response on the arrow issue tracker. Any thoughts on how we should handle this? @martindurant if you have any inclination, I would love it if you were willing to handle/track this problem and find some solution (technical, social, or otherwise).

@martindurant
Copy link
Member

I have no technical solution from our side, and the cython code for this looks... rather complicated.

My initial hunch points to this changeset apache/arrow@f2fb02b

By experimenting, I could get serialisation of the ParquetDataset by removing the schema attribute and also from its pieces; but the hanging function defs certainly in this diff still fouled pickle if not cloudpickle.

Other connected commits:
apache/arrow@f70dbd1 (at least renames the metadata stuff)
apache/arrow@86f480a (main one for the RangeIndex implementation)

I don't know who at arrow can help ( @kszucs perhaps?)

@mrocklin
Copy link
Member

Yes, some suggestions on how we might proceed:

  1. As you've done, track down where things broke and maybe go to that PR and comment there, and see if you can get folks there to help engage
  2. Also as you've done, ask some people that you know who have deep knowledge of the Arrow codebase and community (you might also ping Uwe or Antoine if @kszucs doesn't reply)
  3. Ask for help on the Arrow dev mailing list. We probably need to know their bugfix release schedule anyway to figure out how to handle this
  4. Write a patch to Arrow that fixes the issue to see if that kickstarts things
  5. ...

These are things that I might do when trying to resolve this problem

@martindurant
Copy link
Member

@pitrou @xhochy any thoughts?

@kszucs
Copy link
Contributor

kszucs commented Apr 15, 2019

Sorry, I've missed the notification. I can take a look tomorrow, if that's OK for You.

@pitrou
Copy link
Member

pitrou commented Apr 15, 2019

(I can also handle this)

@kszucs
Copy link
Contributor

kszucs commented Apr 16, 2019

apache/arrow#4156 should resolve the pickling issue for ParquetDataset and ParquetPiece.

Note, that this is a new feature, pickling wasn't implemented for these objects, see https://issues.apache.org/jira/browse/ARROW-5144

@usmcamp0811
Copy link

I am having a very similar problem using bcolz. The system is an offline system so I can't copy over the error message.

Versions in use are:

dask==1.2.0
distributed==1.27.0
bcolz==1.2.1

I haven't been able to find any version combo yet that works...

@martindurant
Copy link
Member

^ this is completely without pyarrow 0.13?

@usmcamp0811
Copy link

usmcamp0811 commented Apr 26, 2019

correct.. I was not using pyarrow at all. I did try and install the version of pyarrow that was mentioned above to work and nothing. I was able to kind of hack this out to make it work by not using the dd.from_bcolz method to construct the DataFrame but rather just wrap the bcolz.todataframe method with delayed and it seems to be working right now.. and as I say that my job just got killed due to being out of memory...

@martindurant
Copy link
Member

bcolz is not that much used so, although there have certainly been some comm changes in distributed, I would suspect this might just be a coincidence. Still, it would be good to work out what in bcolz doesn't serialise, when this happened, and whether it was a change in bcolz or in distributed or something else, like cloudpickle.

@mrocklin
Copy link
Member

mrocklin commented Apr 26, 2019 via email

@pankalos
Copy link

pankalos commented Jun 7, 2019

I Have a Dask DataFrame and when I call map_partitions in it. & Inside every Dataframe I am trying to call cascated_union from shapely in a list of Points, it gives me the very same Error.
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):...............
...................................
Is this the same issue? Does anyone knows how to call cascated_union in each partition?

@TomAugspurger
Copy link
Member

@pankalos most likely it's a similar, but different issue to this one. You may want to read through https://distributed.dask.org/en/latest/serialization.html, and experiment with why a list of shapely points apparently can't be serialized.

@marberi
Copy link

marberi commented Jul 9, 2019

@pankalos @TomAugspurger For reference, I had the same problem as @pankalos and it got solved by downgrading pyarrow to 0.12 fixed the problem.

@martindurant
Copy link
Member

This ought to now work with pyarrow 0.14, if someone would like to try.

@sephib
Copy link

sephib commented Jul 11, 2019

Hi,
I've updated to version:

pyarrow 0.14
arrow-cpp 0.14

and I'm able to serialize and read parquet.
Thx!

@shahaba
Copy link

shahaba commented Jul 23, 2019

I have a somewhat similar issue to @pankalos but with trying to use the dask DataFrame and read_csv function.

The weird thing is that I've used dask (distributed) to create the series of .csv files, but when I try to read them back in (in distributed mode) I get the following error:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last): File "c:\users\...\python\python36\lib\site-packages\distributed\protocol\core.py", line 46, in dumps for key, value in data.items()

File "c:\users\...\programs\python\python36\lib\site-packages\distributed\protocol\core.py", line 47, in <dictcomp> if type(value) is Serialize File "c:\users\...\programs\python\python36\lib\site-packages\distributed\protocol\serialize.py", line 164, in serialize raise TypeError(msg, str(x)[:10000])

TypeError: ('Could not serialize object of type tuple.', "(<function check_meta at 0x000001F48B4A1EA0>, (<function apply at 0x000001F48B1C4400>, <function pandas_read_text at 0x000001F48B50D158>, [<function _make_parser_function.<locals>.parser_f at 0x000001F4E983D6A8>, (<function read_block_from_file at 0x000001F48B3949D8>, <dask.bytes.core.OpenFile object at 0x000001F48C5E3D30>, 0, 64000000, b'\\n'), etc...

I loaded Dask using the following:

cluster = LocalCluster()
client = Client(cluster, serializers=['dask'])
client

Versions:

dask = 2.0.0
python = 3.6.4

'packages': {'required': (('dask', '2.0.0'),
('distributed', '2.0.1'),
('msgpack', '0.6.1'),
('cloudpickle', '1.2.1'),
('tornado', '5.1.1'),
('toolz', '0.9.0')),
'optional': (('numpy', '1.16.4'),
('pandas', '0.25.0'),
('bokeh', '1.0.4'),
('lz4', None),
('dask_ml', '1.0.0'),
('blosc', None))}},

Is this similar? This is a new issue, I've never had this problem before, and running with proccesses=False does make it work but I lose out on the performance gain that I am looking for.

Any thoughts?

@sephib
Copy link

sephib commented Jul 29, 2019

Hi,
we are also getting the error with dd.read_csv.
we created a simple csv file with a list of integers and tried to read them in using:

df = dd.read_csv('hdfs:///tmp/demofile.csv')

We tried to set dtype=object but still getting the same error.

we did not manage to get additional information using %debug

@martindurant
Copy link
Member

I can confirm that the HDFS file system instances are pickleable, and so are open-file instances that reference them. I made a dataframe from csv with a file on HDFS, as in the example, and it pickles fine with cloudpickle, at least on my current setup. In short, I cannot reproduce this problem.

In this issue, we still have no idea what it is that is failing to serialise, and since I cannot reproduce, I'm afraid it's down to the rest of you to try and find out!

@sephib
Copy link

sephib commented Aug 1, 2019

After running update on the OS and re-running a clean installation on a new conda environment I was not able to reproduce the error .

@tsonic
Copy link

tsonic commented Aug 2, 2019

I have a similar issue. Downgrade pandas from 0.25 to 0.24.2 worked for me.

@AlexisMignon
Copy link

AlexisMignon commented Aug 13, 2019

I confirm that the problem seems to be due to pandas 0.25, since I had similar issue using the azure connection and as such had no dependency on pyarrow. The problem was solved as indicated by @tsonic.

@martindurant
Copy link
Member

@TomAugspurger , could there be something in new pandas that is not serialisable?
@AlexisMignon , @tsonic would appreciate a sample of your data, so we can reproduce.

@TomAugspurger
Copy link
Member

I'll take a look if someone has a reproducible example: http://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports

@AlexisMignon
Copy link

AlexisMignon commented Aug 14, 2019

The following code (in a test.py file) raises the problem for me:

import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd

if __name__ == "__main__":

        df = pd.DataFrame(
            data=[("key_%i" % (i // 2), i) for i in range(100000)],
            columns=["key", "value"]
        )

        df.to_csv("./data.csv", index=False)

        c = Client("localhost:8786")
        dd_data = dd.read_csv("./data.csv", blocksize=100000)
        dd_data.groupby("key")["value"].sum().compute()

Launched by doing:

python3 -m venv venv
source venv/bin/activate
pip install pandas==0.25 dask==2.1.0 distributed==2.1.0
dask-scheduler &
dask-worker localhost:8786 &
python test.py

The same thing with pandas==0.24.2 works

@martindurant
Copy link
Member

Works with dask 2.2.0+38.g266c314 (master) and pandas 0.25.0

@hongzmsft
Copy link

I am using conda-forge packages. Pandas 0.25.0 causes the issue as shown in @AlexisMignon 's comment for dask+dask-distributed 2.3.0. Downgrading pandas to 0.24.2, which also causes dask+dask-distributed to downgrade to 2.2.0, works without TypeError.

@TomAugspurger
Copy link
Member

@hongzmsft do you have a reproducible example? I think we're still looking for one.

Alternatively, does the example from #2597 (comment) work for you?

@hongzmsft
Copy link

hongzmsft commented Aug 20, 2019

@TomAugspurger I am using the example from #2597 (comment) . The program runs fine with dask=2.2.0, distributed=2.2.0, pandas=0.24.2, python=3.6*. It also runs fine with dask=2.3.0, distributed=2.3.0, pandas=0.25.0, python=3.7.0. but raise a TypeError as reported in this issue with dask=2.3.0, distributed=2.3.0, pandas=0.25.0, python=3.6*.

Here is a docker file to reproduce the issue:

FROM debian:buster-slim

RUN apt-get update && \
    apt-get install -qy wget && \
    wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && \
    bash Miniconda3-latest-Linux-x86_64.sh -b -p /opt/conda && \
    rm Miniconda3-latest-Linux-x86_64.sh

COPY test.py /

RUN chmod +x /test.py

ENTRYPOINT ["/test.py"]

RUN /opt/conda/bin/conda install -c conda-forge dask=2.3* distributed=2.3* pandas=0.25.0 python=3.6*

A slightly modified test.py from #2597 (comment) :

#!/opt/conda/bin/python

import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd

if __name__ == "__main__":

        df = pd.DataFrame(
            data=[("key_%i" % (i // 2), i) for i in range(100000)],
            columns=["key", "value"]
        )

        df.to_csv("./data.csv", index=False)

        c = Client(n_workers=2)
        dd_data = dd.read_csv("./data.csv", blocksize=100000)
        dd_data.groupby("key")["value"].sum().compute()

The error is like:

TypeError: ('Could not serialize object of type list.', "[<function _make_parser_function.<locals>.parser_f at 0x7f3c3f884ea0>, (<function read_block_from_file at 0x7f3c3f367d90>, <OpenFile '/data.csv'>, 0, 100000, b'\\n'), b'key,value\\n', (<class 'dict'>, []), (<class 'dict'>, [['key', dtype('O')], ['value', dtype('int64')]]), ['key', 'value']]")

The error seems slightly different from what the original issue is about, but consistent with other responses.

@jrbourbeau
Copy link
Member

Thanks for the nice example @hongzmsft and @AlexisMignon! I'm able to reproduce the TypeError on Python 3.6 and, as @hongzmsft pointed out, the error does not occur on Python 3.7.

After some digging, it looks like this serialization issue has to do with cloudpickle's handling of type annotation (which are used in pd.read_csv) for Python 3.4-3.6. There's already an upstream issue in cloudpickle for this (ref cloudpipe/cloudpickle#298) and, even better, there's an open PR with a fix (cloudpipe/cloudpickle#299)!

@jrbourbeau
Copy link
Member

I can confirm that, with the change in cloudpipe/cloudpickle#299, the example in #2597 (comment) runs successfully on Python 3.6

@gcoimbra
Copy link

gcoimbra commented Sep 9, 2019

I can confirm that, with the change in cloudpipe/cloudpickle#299, the example in #2597 (comment) runs successfully on Python 3.6

Could you please elaborate on how you implemented that change in cloudpickle locally?

I've tried replacing cloudpickle.py and cloudpickle_py of cloudpipe/cloudpickle#299 in my site-packages libraries manually since the version that patches that bug isn't available yet. But it didn't worked.

@jrbourbeau
Copy link
Member

jrbourbeau commented Sep 9, 2019

Once there's a new release of cloudpickle you'll be able to upgrade to the latest version using pip or conda as you normally would (e.g. pip install cloudpickle --upgrade). In the meantime, you can use pip to install the development version of cloudpickle directly from GitHub with:

pip install git+https://github.com/cloudpipe/cloudpickle

Hopefully that helps @gcoimbra

@gcoimbra
Copy link

I had to upgrade PyArrow from 0.12.1 to 0.14 because dask asked me to it. But it worked!
Thanks a lot @jrbourbeau !

@ogrisel
Copy link
Contributor

ogrisel commented Sep 11, 2019

cloudpickle 1.2.2 was released.

@gcoimbra
Copy link

gcoimbra commented Sep 11, 2019

The problem came back even with cloudpickle 1.2.2. It happens with PyArrow and fastparquet. Shoud I post this to dask/dask#5317?

Traceback (most recent call last):
  File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/comm/utils.py", line 29, in _to_frames
    msg, serializers=serializers, on_error=on_error, context=context
  File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 167, in serialize
    for obj in x
  File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
    for obj in x
  File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 167, in serialize
    for obj in x
  File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
    for obj in x
  File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 210, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type list.', "[<function _make_parser_function.<locals>.parser_f at 0x7fb4e13e06a8>, (<function read_block_from_file at 0x7fb4d5eecd90>, <OpenFile '/hdd/home/green/CDC/6Periodo/NESPED/AGT/datasets/towers.csv'>, 2240000000, 64000000, b'\\n'), b'radio,mcc,net,area,cell,unit,lon,lat,range,samples,changeable,created,updated,averageSignal\\n', (<class 'dict'>, [['dtype', (<class 'dict'>, [['radio', 'category'], ['mcc', <class 'numpy.uint16'>], ['net', <class 'numpy.uint8'>], ['area', <class 'numpy.uint16'>], ['cell', <class 'numpy.uint32'>], ['lat', <class 'numpy.float32'>], ['lon', <class 'numpy.float32'>], ['range', <class 'numpy.uint32'>], ['samples', <class 'numpy.uint64'>], ['changeable', <class 'bool'>], ['created', <class 'numpy.uint32'>], ['updated', <class 'numpy.uint32'>], ['averageSignal', <class 'numpy.uint64'>]])], ['usecols', dict_keys(['radio', 'mcc', 'net', 'area', 'cell', 'lat', 'lon', 'range', 'samples', 'changeable', 'created', 'updated', 'averageSignal'])]]), (<class 'dict'>, [['radio', 'category'], ['mcc', dtype('uint16')], ['net', dtype('uint8')], ['area', dtype('uint16')], ['cell', dtype('uint32')], ['lon', dtype('float32')], ['lat', dtype('float32')], ['range', dtype('uint32')], ['samples', dtype('uint64')], ['changeable', dtype('bool')], ['created', dtype('uint32')], ['updated', dtype('uint32')], ['averageSignal', dtype('uint64')]]), ['radio', 'mcc', 'net', 'area', 'cell', 'lon', 'lat', 'range', 'samples', 'changeable', 'created', 'updated', 'averageSignal']]")

The error message is bigger because it happens on other threads too.

@martindurant
Copy link
Member

@gcoimbra , as before, it would be really useful to us if you were to go through the values in that list, which are probably also in dict(df.dask), to see which thing in there fails with cloudpickle; I assume it's the _make_parser_function thing, which comes from pyarrow. Is it something else with fastparquet?

@gcoimbra
Copy link

@martindurant

It seems that I was causing the problem. The problem doesn't seems to be with PyArrow or Fastparquet. Because it happens in when I try to read a csv with dask.dataframe.read_csv using usecols optional argument passing a dict_keys object instead of a list. Then, dask tries to serialize the following object (that can be seen in my previous post) is created:

['usecols', dict_keys(...)]

Removing the usecols or using usecols=list() argument fixes the problem. I'm sorry for the trouble.

Do you want me to try to fix the problem and submit a pull request?

@martindurant
Copy link
Member

I'm not sure there's anything to fix: the (pandas) docstring says that usecols must be list-like.

@martindurant
Copy link
Member

In any case, I'll close this, since we now know what's going on.

@bhishanpdl
Copy link

bhishanpdl commented Jul 10, 2020

I got the same error in Jul 10, 2020
MacPro 2017 macos catalina with miniconda env

How to solve the error? Is this because I have a single laptop and for distributed we need a cluster of multiple computers?

# imports
import numpy as np
import pandas as pd

import dask
import dask.dataframe as dd
import dask.array as da
import dask_ml
import pyarrow

print([(x.__name__,x.__version__) for x in 
       [np,pd, dask, dask_ml,pyarrow]])
[('numpy', '1.18.5'), ('pandas', '1.0.5'), ('dask', '2.20.0'), ('dask_ml', '1.5.0'), ('pyarrow', '0.17.1')]

# data
a = da.random.normal(size=(2000, 2000), chunks=(1000, 1000)) # data
res = a.dot(a.T).mean(axis=0) # operation
res = res.persist()  # start computation in the background

# code
from dask.distributed import Client, progress

client = Client()  # use dask.distributed by default
progress(res)      # watch progress
res.compute()      # convert to final result when done if desired

Error

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/protocol/core.py", line 106, in loads
    header = msgpack.loads(header, use_list=False, **msgpack_opts)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 135, in unpackb
    ret = unpacker._unpack()
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 676, in _unpack
    ret[key] = self._unpack(EX_CONSTRUCT)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 672, in _unpack
    "%s is not allowed for map key" % str(type(key))
ValueError: <class 'tuple'> is not allowed for map key
distributed.core - ERROR - <class 'tuple'> is not allowed for map key
Traceback (most recent call last):
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/core.py", line 456, in handle_stream
    msgs = await comm.read()
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/tcp.py", line 212, in read
    frames, deserialize=self.deserialize, deserializers=deserializers
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 69, in from_frames
    res = _from_frames()
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 55, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/protocol/core.py", line 106, in loads
    header = msgpack.loads(header, use_list=False, **msgpack_opts)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 135, in unpackb
    ret = unpacker._unpack()
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 676, in _unpack
    ret[key] = self._unpack(EX_CONSTRUCT)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 672, in _unpack
    "%s is not allowed for map key" % str(type(key))
ValueError: <class 'tuple'> is not allowed for map key
distributed.core - ERROR - <class 'tuple'> is not allowed for map key
Traceback (most recent call last):
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/core.py", line 412, in handle_comm
    result = await result
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/scheduler.py", line 2491, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/core.py", line 456, in handle_stream
    msgs = await comm.read()
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/tcp.py", line 212, in read
    frames, deserialize=self.deserialize, deserializers=deserializers
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 69, in from_frames
    res = _from_frames()
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 55, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/protocol/core.py", line 106, in loads
    header = msgpack.loads(header, use_list=False, **msgpack_opts)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 135, in unpackb
    ret = unpacker._unpack()
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 676, in _unpack
    ret[key] = self._unpack(EX_CONSTRUCT)
  File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 672, in _unpack
    "%s is not allowed for map key" % str(type(key))
ValueError: <class 'tuple'> is not allowed for map key
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<ipython-input-7-4d5bc96d9bb1> in <module>
      1 progress(res)      # watch progress
      2 
----> 3 res.compute()      # convert to final result when done if desired

~/opt/miniconda3/envs/dsk/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 

~/opt/miniconda3/envs/dsk/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    442         postcomputes.append(x.__dask_postcompute__())
    443 
--> 444     results = schedule(dsk, keys, **kwargs)
    445     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    446 

~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2590                     should_rejoin = False
   2591             try:
-> 2592                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2593             finally:
   2594                 for f in futures.values():

~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1888                 direct=direct,
   1889                 local_worker=local_worker,
-> 1890                 asynchronous=asynchronous,
   1891             )
   1892 

~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    766         else:
    767             return sync(
--> 768                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    769             )
    770 

~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    343     if error[0]:
    344         typ, exc, tb = error[0]
--> 345         raise exc.with_traceback(tb)
    346     else:
    347         return result[0]

~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/utils.py in f()
    327             if callback_timeout is not None:
    328                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 329             result[0] = yield future
    330         except Exception as exc:
    331             error[0] = sys.exc_info()

~/opt/miniconda3/envs/dsk/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

CancelledError: 

@gcoimbra
Copy link

I'm not sure there's anything to fix: the (pandas) docstring says that usecols must be list-like.

Can't we warn users with a ValueError exception?

@martindurant
Copy link
Member

I don't think we're generally in a position to check the types of arguments that we pass on to other functions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken help wanted
Projects
None yet
Development

No branches or pull requests