Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] ParquetDataset and ParquetPiece not serializable #21625

Closed
asfimport opened this issue Apr 8, 2019 · 20 comments
Closed

[Python] ParquetDataset and ParquetPiece not serializable #21625

asfimport opened this issue Apr 8, 2019 · 20 comments

Comments

@asfimport
Copy link
Collaborator

Since 0.13.0, parquet instances are no longer serialisable, which means that dask.distributed cannot pass them between processes in order to load parquet in parallel.

Example:

>>> import cloudpickle
>>> import pyarrow.parquet as pq
>>> pf = pq.ParquetDataset('nation.impala.parquet')
>>> cloudpickle.dumps(pf)
~/anaconda/envs/py36/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
    893     try:
    894         cp = CloudPickler(file, protocol=protocol)
--> 895         cp.dump(obj)
    896         return file.getvalue()
    897     finally:

~/anaconda/envs/py36/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
    266         self.inject_addons()
    267         try:
--> 268             return Pickler.dump(self, obj)
    269         except RuntimeError as e:
    270             if 'recursion' in e.args[0]:

~/anaconda/envs/py36/lib/python3.6/pickle.py in dump(self, obj)
    407         if self.proto >= 4:
    408             self.framer.start_framing()
--> 409         self.save(obj)
    410         self.write(STOP)
    411         self.framer.end_framing()

~/anaconda/envs/py36/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    519
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, \*rv)
    522
    523     def persistent_id(self, obj):

~/anaconda/envs/py36/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    632
    633         if state is not None:
--> 634             save(state)
    635             write(BUILD)
    636

~/anaconda/envs/py36/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478

~/anaconda/envs/py36/lib/python3.6/pickle.py in save_dict(self, obj)
    819
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822
    823     dispatch[dict] = save_dict

~/anaconda/envs/py36/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

~/anaconda/envs/py36/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    494             reduce = getattr(obj, "__reduce_ex__", None)
    495             if reduce is not None:
--> 496                 rv = reduce(self.proto)
    497             else:
    498                 reduce = getattr(obj, "__reduce__", None)

~/anaconda/envs/py36/lib/python3.6/site-packages/pyarrow/_parquet.cpython-36m-darwin.so in pyarrow._parquet.ParquetSchema.__reduce_cython__()

TypeError: no default __reduce__ due to non-trivial __cinit__

The indicated schema instance is also referenced by the ParquetDatasetPiece s.

ref: dask/distributed#2597

Environment: osx python36/conda cloudpickle 0.8.1
arrow-cpp 0.13.0 py36ha71616b_0 conda-forge
pyarrow 0.13.0 py36hb37e6aa_0 conda-forge

Reporter: Martin Durant / @martindurant
Assignee: Krisztian Szucs / @kszucs

Original Issue Attachments:

PRs and other links:

Note: This issue was originally created as ARROW-5144. Please see the migration documentation for further details.

@asfimport
Copy link
Collaborator Author

Matthew Rocklin / @mrocklin:
Hi everyone,

This is pretty critical for Dask usage. Anyone trying to use PyArrow 0.13 in a Dask workflow will break pretty hard here. This isn't something that we can work around easily on our side. It would be great to know if this is likely to be resolved quickly, or if we should warn users strongly away from 0.13.

In general, I recommend serialization tests for any project looking to interact with distributed computing libraries in Python. Often this consists of tests like the following for any type that you think a parallel computing framework might want to interact with.

def test_serialization():
    obj = MyObj()
    obj2 = pickle.loads(pickle.dumps(obj))

    assert obj == obj2

@asfimport
Copy link
Collaborator Author

Dave Hirschfeld / @dhirschfeld:
As a user of pyarrow and dask.distributed together this issue will prevent me from upgrading to 0.13 which I'm otherwise very keen to do.

@asfimport
Copy link
Collaborator Author

Wes McKinney / @wesm:
I'm on vacation until April 22. Are you able to submit a PR?

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
Is it just with cloudpickle or also with plain pickle?

@asfimport
Copy link
Collaborator Author

Martin Durant / @martindurant:
Cloudpickle actually does a better job than ordinary pickle, which also has trouble with the opener function closure attached to the dataset instance and the pieces.

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
@martindurant At which one point did this work? I've tried all PyArrow versions from 0.7 to 0.13 and serializing a ParquetDataset always fails with the same error you're reporting.

@asfimport
Copy link
Collaborator Author

Krisztian Szucs / @kszucs:
@pitrou I'm starting to implement the reducers to make cloudpickle working.

@asfimport
Copy link
Collaborator Author

Krisztian Szucs / @kszucs:
I also don't understand how could have it been working without proper ___reduce___ methods (at least pickling fails with 0.12 too).

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
That's what I said above: picking fails from 0.7 to 0.13 at least.

@asfimport
Copy link
Collaborator Author

Krisztian Szucs / @kszucs:
So this seems like a new feature rather than a bug. I suppose something must have changed on the caller/dask side.

@asfimport
Copy link
Collaborator Author

Martin Durant / @martindurant:
Well this is confusing! We don't explicitly have pyarrow-parquet-in-distributed tests (apparently we should), but people have certainly been reading parquet for some time. Could they all have been using fastparquet? That seems unlikely, especially in HDFS contexts.

@asfimport
Copy link
Collaborator Author

Matthew Rocklin / @mrocklin:
Most objects in Python are serializable by default. My guess is that this object recently gained a non-serializable attribute, perhaps like an open file?

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
@mrocklin Can you post a snippet that used to work and that doesn't work anymore?

@asfimport
Copy link
Collaborator Author

Sarah Bird:
This might not be what you were looking for @pitrou but this is what breaks consistently for me from 0.12.1 to 0.13.0:

import dask.dataframe as dd
from dask.distributed import ClientClient()
df = dd.read_parquet('my_data.parquet', engine='pyarrow')
df.head()

 
(dask 1.2.0, distributed 1.27.0)

Let me know if I can better help.

@asfimport
Copy link
Collaborator Author

Krisztian Szucs / @kszucs:
[~birdsarah] with any parquet file?

@asfimport
Copy link
Collaborator Author

Sarah Bird:
@kszucs this is with variations on my dataset. I have attached one piece which is sufficient to reproduce the error. It is web crawl data. The dtypes are:

argument_0                      object
argument_1                      object
argument_2                      object
argument_3                      object
argument_4                      object
argument_5                      object
argument_6                      object
argument_7                      object
arguments                       object
arguments_len                    int64
call_stack                      object
crawl_id                         int32
document_url                    object
func_name                       object
in_iframe                         bool
operation                       object
script_col                       int64
script_line                      int64
script_loc_eval                 object
script_url                      object
symbol                          object
time_stamp         datetime64[ns, UTC]
top_level_url                   object
value_1000                      object
value_len                        int64
visit_id                         int64
dtype: object

My traceback is:

distributed.protocol.pickle - INFO - Failed to serialize (<function safe_head at 0x7f3d57f2c7b8>, (<function _read_pyarrow_parquet_piece at 0x7f3d57ef9268>, <dask.bytes.local.LocalFileSystem object at 0x7f3db58
ea4e0>, ParquetDatasetPiece('javascript_10percent_value_1000_only.parquet/part.0.parquet', row_group=None, partition_keys=[]), ['argument_0', 'argument_1', 'argument_2', 'argument_3', 'argument_4', 'argument_5'
, 'argument_6', 'argument_7', 'arguments', 'arguments_len', 'call_stack', 'crawl_id', 'document_url', 'func_name', 'in_iframe', 'operation', 'script_col', 'script_line', 'script_loc_eval', 'script_url', 'symbol
', 'time_stamp', 'top_level_url', 'value_1000', 'value_len', 'visit_id'], [], False, None, []), 5). Exception: no default __reduce__ due to non-trivial __cinit__
distributed.protocol.core - CRITICAL - Failed to Serialize                                                             
Traceback (most recent call last):                                                                                         
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/distributed/protocol/core.py", line 54, in dumps
    for key, value in data.items()                                                                                                             
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
    if type(value) is Serialize}                                                                                                                                                                                 
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 164, in serialize
    raise TypeError(msg, str(x)[:10000])                                                                                                                                                                          
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x7f3d57f2c7b8>, (<function _read_pyarrow_parquet_piece at 0x7f3d57ef9268>, <dask.bytes.local.LocalFileSystem object at 0x7f3db5
8ea4e0>, ParquetDatasetPiece('javascript_10percent_value_1000_only.parquet/part.0.parquet', row_group=None, partition_keys=[]), ['argument_0', 'argument_1', 'argument_2', 'argument_3', 'argument_4', 'argument_5
', 'argument_6', 'argument_7', 'arguments', 'arguments_len', 'call_stack', 'crawl_id', 'document_url', 'func_name', 'in_iframe', 'operation', 'script_col', 'script_line', 'script_loc_eval', 'script_url', 'symbo
l', 'time_stamp', 'top_level_url', 'value_1000', 'value_len', 'visit_id'], [], False, None, []), 5)")                      
distributed.comm.utils - INFO - Unserializable Message: [{'op': 'update-graph', 'tasks': {"('head-1-5-read-parquet-daaccee11e9cff29ad1ee5622ffd6c69', 0)": <Serialize: ('read-parquet-head-1-5-read-parquet-daacce
e11e9cff29ad1ee5622ffd6c69', 0)>, "('read-parquet-head-1-5-read-parquet-daaccee11e9cff29ad1ee5622ffd6c69', 0)": <Serialize: (<function safe_head at 0x7f3d57f2c7b8>, (<function _read_pyarrow_parquet_piece at 0x7
f3d57ef9268>, <dask.bytes.local.LocalFileSystem object at 0x7f3db58ea4e0>, ParquetDatasetPiece('javascript_10percent_value_1000_only.parquet/part.0.parquet', row_group=None, partition_keys=[]), ['argument_0', '
argument_1', 'argument_2', 'argument_3', 'argument_4', 'argument_5', 'argument_6', 'argument_7', 'arguments', 'arguments_len', 'call_stack', 'crawl_id', 'document_url', 'func_name', 'in_iframe', 'operation', 's
cript_col', 'script_line', 'script_loc_eval', 'script_url', 'symbol', 'time_stamp', 'top_level_url', 'value_1000', 'value_len', 'visit_id'], [], False, None, []), 5)>}, 'dependencies': {"('head-1-5-read-parquet
-daaccee11e9cff29ad1ee5622ffd6c69', 0)": ["('read-parquet-head-1-5-read-parquet-daaccee11e9cff29ad1ee5622ffd6c69', 0)"], "('read-parquet-head-1-5-read-parquet-daaccee11e9cff29ad1ee5622ffd6c69', 0)": []}, 'keys'
: ["('head-1-5-read-parquet-daaccee11e9cff29ad1ee5622ffd6c69', 0)"], 'restrictions': {}, 'loose_restrictions': None, 'priority': {"('read-parquet-head-1-5-read-parquet-daaccee11e9cff29ad1ee5622ffd6c69', 0)": 0,
 "('head-1-5-read-parquet-daaccee11e9cff29ad1ee5622ffd6c69', 0)": 1}, 'user_priority': 0, 'resources': None, 'submitting_task': None, 'retries': None, 'fifo_timeout': '60s', 'actors': None}]                    
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x7f3d57f2c7b8>, (<function _read_pyarrow_parquet_piece at 0x7f3d57ef9268>, <dask.bytes.local.LocalFileSys
tem object at 0x7f3db58ea4e0>, ParquetDatasetPiece('javascript_10percent_value_1000_only.parquet/part.0.parquet', row_group=None, partition_keys=[]), ['argument_0', 'argument_1', 'argument_2', 'argument_3', 'ar
gument_4', 'argument_5', 'argument_6', 'argument_7', 'arguments', 'arguments_len', 'call_stack', 'crawl_id', 'document_url', 'func_name', 'in_iframe', 'operation', 'script_col', 'script_line', 'script_loc_eval'
, 'script_url', 'symbol', 'time_stamp', 'top_level_url', 'value_1000', 'value_len', 'visit_id'], [], False, None, []), 5)")
Traceback (most recent call last):
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/distributed/batched.py", line 94, in _background_send
    on_error='raise')
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/distributed/comm/tcp.py", line 224, in write
    'recipient': self._peer_addr})
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/distributed/comm/utils.py", line 50, in to_frames
    res = yield offload(_to_frames)
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/distributed/comm/utils.py", line 43, in _to_frames
    context=context))
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/distributed/protocol/core.py", line 54, in dumps
    for key, value in data.items()
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
    if type(value) is Serialize}
  File "/home/bird/miniconda3/envs/pyarrowtest/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 164, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x7f3d57f2c7b8>, (<function _read_pyarrow_parquet_piece at 0x7f3d57ef9268>, <dask.bytes.local.LocalFileSystem object at 0x7f3db5
8ea4e0>, ParquetDatasetPiece('javascript_10percent_value_1000_only.parquet/part.0.parquet', row_group=None, partition_keys=[]), ['argument_0', 'argument_1', 'argument_2', 'argument_3', 'argument_4', 'argument_5
', 'argument_6', 'argument_7', 'arguments', 'arguments_len', 'call_stack', 'crawl_id', 'document_url', 'func_name', 'in_iframe', 'operation', 'script_col', 'script_line', 'script_loc_eval', 'script_url', 'symbo
l', 'time_stamp', 'top_level_url', 'value_1000', 'value_len', 'visit_id'], [], False, None, []), 5)")

part.0.parquet

@asfimport
Copy link
Collaborator Author

Sarah Bird:
I should add: I can also get the cloudpickle error that is reported by the original report with the same file, but it is invariant between pyarrow 0.12.1 and 0.13.0. The above distributed error is what is changing for me.

@asfimport
Copy link
Collaborator Author

Krisztian Szucs / @kszucs:
Issue resolved by pull request 4156
#4156

@asfimport
Copy link
Collaborator Author

Wes McKinney / @wesm:
@mrocklin We might be able to do a 0.13.1 release including this and some other fixes in a week or two. If that would be helpful let us know.

FWIW, ParquetDatasetPiece is regarded as an implementation detail and wasn't intended necessarily to be serializable. So will keep this in mind for the future

@asfimport
Copy link
Collaborator Author

Matthew Rocklin / @mrocklin:
That would be helpful, yes. We're currently raising an error in master telling people to downgrade. We get bug reports about this issue most days it seems.

@asfimport asfimport added this to the 0.14.0 milestone Jan 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants