Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support arrow Table/RecordBatch types #2103

Closed
dhirschfeld opened this issue Jul 8, 2018 · 29 comments
Closed

Support arrow Table/RecordBatch types #2103

dhirschfeld opened this issue Jul 8, 2018 · 29 comments

Comments

@dhirschfeld
Copy link
Contributor

xref: #614

I'm sure this is possible with #606 so this issue is mostly just to document my attempt to get this working. Out-of-the box (1.22.0+9.gdb758d0f), attempting to pass an arrow Table or RecordBatch results in a TypeError:

TypeError: no default __reduce__ due to non-trivial __cinit__
from distributed import Client
import pandas as pd
import pyarrow as pa

client = Client()

df = pd.DataFrame({'A': list('abc'), 'B': [1,2,3]})
tbl = pa.Table.from_pandas(df, preserve_index=False)

def echo(arg):
    return arg
>>> client.submit(echo, df).result().equals(df)
True
>>> client.submit(echo, tbl).result()
distributed.protocol.pickle - INFO - Failed to serialize (pyarrow.Table
A: string
B: int64
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
            b' "A", "field_name": "A", "pandas_type": "unicode", "numpy_type":'
            b' "object", "metadata": null}, {"name": "B", "field_name": "B", "'
            b'pandas_type": "int64", "numpy_type": "int64", "metadata": null}]'
            b', "pandas_version": "0.23.1"}'},). Exception: no default __reduce__ due to non-trivial __cinit__
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
C:\Miniconda3\lib\site-packages\distributed\protocol\pickle.py in dumps(x)
     37     try:
---> 38         result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     39         if len(result) < 1000:

C:\Miniconda3\lib\site-packages\pyarrow\lib.cp36-win_amd64.pyd in pyarrow.lib.RecordBatch.__reduce_cython__()

TypeError: no default __reduce__ due to non-trivial __cinit__

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-48-4e8e2ea90e79> in <module>()
----> 1 client.submit(echo, tbl).result()

C:\Miniconda3\lib\site-packages\distributed\client.py in submit(self, func, *args, **kwargs)
   1236                                          resources={skey: resources} if resources else None,
   1237                                          retries=retries,
-> 1238                                          fifo_timeout=fifo_timeout)
   1239 
   1240         logger.debug("Submit %s(...), %s", funcname(func), key)

C:\Miniconda3\lib\site-packages\distributed\client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout)
   2093 
   2094             self._send_to_scheduler({'op': 'update-graph',
-> 2095                                      'tasks': valmap(dumps_task, dsk3),
   2096                                      'dependencies': dependencies,
   2097                                      'keys': list(flatkeys),

C:\Miniconda3\lib\site-packages\cytoolz\dicttoolz.pyx in cytoolz.dicttoolz.valmap()

C:\Miniconda3\lib\site-packages\cytoolz\dicttoolz.pyx in cytoolz.dicttoolz.valmap()

C:\Miniconda3\lib\site-packages\distributed\worker.py in dumps_task(task)
    799         elif not any(map(_maybe_complex, task[1:])):
    800             return {'function': dumps_function(task[0]),
--> 801                     'args': warn_dumps(task[1:])}
    802     return to_serialize(task)
    803 

C:\Miniconda3\lib\site-packages\distributed\worker.py in warn_dumps(obj, dumps, limit)
    808 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
    809     """ Dump an object to bytes, warn if those bytes are large """
--> 810     b = dumps(obj)
    811     if not _warn_dumps_warned[0] and len(b) > limit:
    812         _warn_dumps_warned[0] = True

C:\Miniconda3\lib\site-packages\distributed\protocol\pickle.py in dumps(x)
     49     except Exception:
     50         try:
---> 51             return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     52         except Exception as e:
     53             logger.info("Failed to serialize %s. Exception: %s", x, e)

C:\Miniconda3\lib\site-packages\cloudpickle\cloudpickle.py in dumps(obj, protocol)
    893     try:
    894         cp = CloudPickler(file, protocol=protocol)
--> 895         cp.dump(obj)
    896         return file.getvalue()
    897     finally:

C:\Miniconda3\lib\site-packages\cloudpickle\cloudpickle.py in dump(self, obj)
    266         self.inject_addons()
    267         try:
--> 268             return Pickler.dump(self, obj)
    269         except RuntimeError as e:
    270             if 'recursion' in e.args[0]:

C:\Miniconda3\lib\pickle.py in dump(self, obj)
    407         if self.proto >= 4:
    408             self.framer.start_framing()
--> 409         self.save(obj)
    410         self.write(STOP)
    411         self.framer.end_framing()

C:\Miniconda3\lib\pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

C:\Miniconda3\lib\pickle.py in save_tuple(self, obj)
    734         if n <= 3 and self.proto >= 2:
    735             for element in obj:
--> 736                 save(element)
    737             # Subtle.  Same as in the big comment below.
    738             if id(obj) in memo:

C:\Miniconda3\lib\pickle.py in save(self, obj, save_persistent_id)
    494             reduce = getattr(obj, "__reduce_ex__", None)
    495             if reduce is not None:
--> 496                 rv = reduce(self.proto)
    497             else:
    498                 reduce = getattr(obj, "__reduce__", None)

C:\Miniconda3\lib\site-packages\pyarrow\lib.cp36-win_amd64.pyd in pyarrow.lib.RecordBatch.__reduce_cython__()

TypeError: no default __reduce__ due to non-trivial __cinit__
@mrocklin
Copy link
Member

mrocklin commented Jul 8, 2018

Interesting.

I can reproduce this with just Pickle. I recommend raising this as an issue with Arrow.

In [1]: import pandas as pd
   ...: import pyarrow as pa
   ...: df = pd.DataFrame({'A': list('abc'), 'B': [1,2,3]})
   ...: tbl = pa.Table.from_pandas(df, preserve_index=False)
   ...: 
   ...: 

In [2]: import pickle

In [3]: b = pickle.dumps(tbl)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-3-264061ce888e> in <module>()
----> 1 b = pickle.dumps(tbl)

~/Software/anaconda/envs/arrow/lib/python3.6/site-packages/pyarrow/lib.cpython-36m-x86_64-linux-gnu.so in pyarrow.lib.Table.__reduce_cython__()

TypeError: no default __reduce__ due to non-trivial __cinit__

Alternatively if you wanted you could also help to implement a custom serialization solution for Arrow for Dask (see docs here) which would be useful to help avoid memory copies during transfer, but this is probably a minor performance concern in the common case. Getting other projects to implement the pickle protocol is probably the first step.

@dhirschfeld
Copy link
Contributor Author

dhirschfeld commented Jul 8, 2018

The docs seem pretty clear/straightforward, however I haven't (yet) gotten it to work:

from distributed.protocol.serialize import register_serialization
import pyarrow as arw


def serialize(batch):
    sink = arw.BufferOutputStream()
    writer = arw.RecordBatchStreamWriter(sink, batch.schema)
    writer.write_batch(batch)
    writer.close()
    buf = sink.get_result()
    return buf.to_pybytes()

def deserialize(blob):
    reader = arw.RecordBatchStreamReader(arw.BufferReader(blob))
    return reader.read_next_batch()

register_serialization(arw.RecordBatch, serialize, deserialize)
>>> batch = arw.RecordBatch.from_pandas(df)
>>> batch
<pyarrow.lib.RecordBatch at 0x9fd9ae8>
>>> isinstance(batch, arw.RecordBatch)
True
>>> batch.equals(deserialize(serialize(batch)))
True
>>> client.submit(echo, batch).result()
distributed.protocol.pickle - INFO - Failed to serialize (<pyarrow.lib.RecordBatch object at 0x0000000009FD9AE8>,). Exception: no default __reduce__ due to non-trivial __cinit__
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
C:\Miniconda3\lib\site-packages\distributed\protocol\pickle.py in dumps(x)
     37     try:
---> 38         result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     39         if len(result) < 1000:

C:\Miniconda3\lib\site-packages\pyarrow\lib.cp36-win_amd64.pyd in pyarrow.lib.RecordBatch.__reduce_cython__()

TypeError: no default __reduce__ due to non-trivial __cinit__

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-111-39ca69d5a2b4> in <module>()
----> 1 client.submit(echo, batch).result()

C:\Miniconda3\lib\site-packages\distributed\client.py in submit(self, func, *args, **kwargs)
   1236                                          resources={skey: resources} if resources else None,
   1237                                          retries=retries,
-> 1238                                          fifo_timeout=fifo_timeout)
   1239 
   1240         logger.debug("Submit %s(...), %s", funcname(func), key)

C:\Miniconda3\lib\site-packages\distributed\client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout)
   2093 
   2094             self._send_to_scheduler({'op': 'update-graph',
-> 2095                                      'tasks': valmap(dumps_task, dsk3),
   2096                                      'dependencies': dependencies,
   2097                                      'keys': list(flatkeys),

C:\Miniconda3\lib\site-packages\cytoolz\dicttoolz.pyx in cytoolz.dicttoolz.valmap()

C:\Miniconda3\lib\site-packages\cytoolz\dicttoolz.pyx in cytoolz.dicttoolz.valmap()

C:\Miniconda3\lib\site-packages\distributed\worker.py in dumps_task(task)
    799         elif not any(map(_maybe_complex, task[1:])):
    800             return {'function': dumps_function(task[0]),
--> 801                     'args': warn_dumps(task[1:])}
    802     return to_serialize(task)
    803 

C:\Miniconda3\lib\site-packages\distributed\worker.py in warn_dumps(obj, dumps, limit)
    808 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
    809     """ Dump an object to bytes, warn if those bytes are large """
--> 810     b = dumps(obj)
    811     if not _warn_dumps_warned[0] and len(b) > limit:
    812         _warn_dumps_warned[0] = True

C:\Miniconda3\lib\site-packages\distributed\protocol\pickle.py in dumps(x)
     49     except Exception:
     50         try:
---> 51             return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     52         except Exception as e:
     53             logger.info("Failed to serialize %s. Exception: %s", x, e)

C:\Miniconda3\lib\site-packages\cloudpickle\cloudpickle.py in dumps(obj, protocol)
    893     try:
    894         cp = CloudPickler(file, protocol=protocol)
--> 895         cp.dump(obj)
    896         return file.getvalue()
    897     finally:

C:\Miniconda3\lib\site-packages\cloudpickle\cloudpickle.py in dump(self, obj)
    266         self.inject_addons()
    267         try:
--> 268             return Pickler.dump(self, obj)
    269         except RuntimeError as e:
    270             if 'recursion' in e.args[0]:

C:\Miniconda3\lib\pickle.py in dump(self, obj)
    407         if self.proto >= 4:
    408             self.framer.start_framing()
--> 409         self.save(obj)
    410         self.write(STOP)
    411         self.framer.end_framing()

C:\Miniconda3\lib\pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

C:\Miniconda3\lib\pickle.py in save_tuple(self, obj)
    734         if n <= 3 and self.proto >= 2:
    735             for element in obj:
--> 736                 save(element)
    737             # Subtle.  Same as in the big comment below.
    738             if id(obj) in memo:

C:\Miniconda3\lib\pickle.py in save(self, obj, save_persistent_id)
    494             reduce = getattr(obj, "__reduce_ex__", None)
    495             if reduce is not None:
--> 496                 rv = reduce(self.proto)
    497             else:
    498                 reduce = getattr(obj, "__reduce__", None)

C:\Miniconda3\lib\site-packages\pyarrow\lib.cp36-win_amd64.pyd in pyarrow.lib.RecordBatch.__reduce_cython__()

TypeError: no default __reduce__ due to non-trivial __cinit__

...so it seems like my custom serialization functions are being ignored?

Am I missing something or is this a bug? Maybe I need to somehow register the functions with the remote workers?

@dhirschfeld
Copy link
Contributor Author

I recommend raising this as an issue with Arrow

Looks like this is already being tracked:
https://issues.apache.org/jira/browse/ARROW-1715

@dhirschfeld
Copy link
Contributor Author

With the scipy sprints coming up I just thought I'd clarify the status of this issue which is that I believe it is highlighting a bug in the dask custom serialization.

Irrespective of the fact that arrow doesn't implement the pickle protocol IIUC I should still be able to register custom serialization/deserialisation functions however:

register_serialization(arw.RecordBatch, serialize, deserialize)

...appears to have no effect.

I'd be glad to learn that I'm just doing something wrong in which case I'd be happy to put in a PR to clarify the documentation.

@mrocklin
Copy link
Member

Do you have an implementation for serialize and deserialize then? Are you able to publish them? Perhaps either here as a PR?

@dhirschfeld
Copy link
Contributor Author

dhirschfeld commented Jul 12, 2018

Published above, but it might have got lost in the traceback noise:

from distributed.protocol.serialize import register_serialization
import pyarrow as arw


def serialize(batch):
    sink = arw.BufferOutputStream()
    writer = arw.RecordBatchStreamWriter(sink, batch.schema)
    writer.write_batch(batch)
    writer.close()
    buf = sink.get_result()
    return buf.to_pybytes()

def deserialize(blob):
    reader = arw.RecordBatchStreamReader(arw.BufferReader(blob))
    return reader.read_next_batch()

register_serialization(arw.RecordBatch, serialize, deserialize)

@mrocklin
Copy link
Member

mrocklin commented Jul 12, 2018 via email

@dhirschfeld
Copy link
Contributor Author

Right - user error! Sorry for the noise.

It is right there in both the docs and the example :|

@mrocklin
Copy link
Member

mrocklin commented Jul 12, 2018 via email

@dhirschfeld
Copy link
Contributor Author

You mean add a file distributed/protocol/arrow.py and add it to the __init__.py?

Can do, if that's what you're after...

@dhirschfeld
Copy link
Contributor Author

dhirschfeld commented Jul 12, 2018

Hmmm, still having problems:

import pyarrow as arw
import pandas as pd

df = pd.DataFrame({'A': list('abc'), 'B': [1,2,3]})
batch = arw.RecordBatch.from_pandas(df, preserve_index=False)

def echo(arg):
    return arg
from distributed.protocol.serialize import register_serialization

def serialize(batch):
    sink = arw.BufferOutputStream()
    writer = arw.RecordBatchStreamWriter(sink, batch.schema)
    writer.write_batch(batch)
    writer.close()
    buf = sink.get_result()
    header = {}
    frames = [buf.to_pybytes()]
    return header, frames

def deserialize(header, frames):
    blob = frames[0]
    reader = arw.RecordBatchStreamReader(arw.BufferReader(blob))
    return reader.read_next_batch()

register_serialization(arw.RecordBatch, serialize, deserialize)

The functions still work (roundtrip):

>>> batch.equals(deserialize(*serialize(batch)))
True

...but the client.submit call still seems to ignore the custom functions?

>>> client.submit(echo, batch, pure=False)
distributed.protocol.pickle - INFO - Failed to serialize (<pyarrow.lib.RecordBatch object at 0x000000000CDE1EF8>,). Exception: no default __reduce__ due to non-trivial __cinit__
<snip>
TypeError: no default __reduce__ due to non-trivial __cinit__

@dhirschfeld dhirschfeld reopened this Jul 12, 2018
@dhirschfeld
Copy link
Contributor Author

Just as a sanity check I tried the Human example and it works fine:

>>> human = Human('Alice')
>>> client.submit(echo, human, pure=False).result().name
'Alice'

I thought maybe it's something to do with different environments between my desktop and the workers but we're all running arrow=0.9.0

>>> def version():
...     import pyarrow as awr
...     return arw.__version__

>>> version()
'0.9.0'

>>> client.run(version)
{'tcp://192.168.224.180:64230': '0.9.0',
 'tcp://192.168.224.196:54674': '0.9.0',
 'tcp://192.168.224.34:63531': '0.9.0',
 'tcp://192.168.226.190:62284': '0.9.0',
 'tcp://192.168.228.225:55086': '0.9.0',
 'tcp://192.168.230.94:60170': '0.9.0',
 'tcp://192.168.231.121:58951': '0.9.0',
 'tcp://192.168.232.202:55595': '0.9.0',
 'tcp://192.168.235.98:59804': '0.9.0',
 'tcp://192.168.236.232:60676': '0.9.0'}

@dhirschfeld
Copy link
Contributor Author

Note:

The Human example also fails if it isn't picklable:

class Human(object):
    def __init__(self, name):
        self.name = name

    def __getstate__(self):
        1/0
>>> client.submit(echo, human, pure=False)
distributed.protocol.pickle - INFO - Failed to serialize (<__main__.Human object at 0x00000000054AD6A0>,). Exception: division by zero
Traceback (most recent call last):
  File "C:\Miniconda3\lib\site-packages\distributed\protocol\pickle.py", line 38, in dumps
    result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "C:/Users/dhirschf/.PyCharm2018.1/config/scratches/scratch_35.py", line 38, in __getstate__
    1/0
ZeroDivisionError: division by zero
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "C:\Miniconda3\lib\site-packages\IPython\core\interactiveshell.py", line 2963, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-4-c602a524231a>", line 1, in <module>
    client.submit(echo, human, pure=False)
  File "C:\Miniconda3\lib\site-packages\distributed\client.py", line 1238, in submit
    fifo_timeout=fifo_timeout)
  File "C:\Miniconda3\lib\site-packages\distributed\client.py", line 2095, in _graph_to_futures
    'tasks': valmap(dumps_task, dsk3),
  File "cytoolz/dicttoolz.pyx", line 165, in cytoolz.dicttoolz.valmap
  File "cytoolz/dicttoolz.pyx", line 190, in cytoolz.dicttoolz.valmap
  File "C:\Miniconda3\lib\site-packages\distributed\worker.py", line 801, in dumps_task
    'args': warn_dumps(task[1:])}
  File "C:\Miniconda3\lib\site-packages\distributed\worker.py", line 810, in warn_dumps
    b = dumps(obj)
  File "C:\Miniconda3\lib\site-packages\distributed\protocol\pickle.py", line 51, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "C:\Miniconda3\lib\site-packages\cloudpickle\cloudpickle.py", line 895, in dumps
    cp.dump(obj)
  File "C:\Miniconda3\lib\site-packages\cloudpickle\cloudpickle.py", line 268, in dump
    return Pickler.dump(self, obj)
  File "C:\Miniconda3\lib\pickle.py", line 409, in dump
    self.save(obj)
  File "C:\Miniconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Miniconda3\lib\pickle.py", line 736, in save_tuple
    save(element)
  File "C:\Miniconda3\lib\pickle.py", line 496, in save
    rv = reduce(self.proto)
  File "C:/Users/dhirschf/.PyCharm2018.1/config/scratches/scratch_35.py", line 38, in __getstate__
    1/0
ZeroDivisionError: division by zero

@dhirschfeld
Copy link
Contributor Author

dhirschfeld commented Jul 12, 2018

The line which is hit in both cases is:

File "C:\Miniconda3\lib\site-packages\distributed\client.py", line 2116, in _graph_to_futures
    'tasks': valmap(dumps_task, dsk3),

self._send_to_scheduler({'op': 'update-graph',
'tasks': valmap(dumps_task, dsk3),
'dependencies': dependencies,
'keys': list(flatkeys),
'restrictions': restrictions or {},
'loose_restrictions': loose_restrictions,
'priority': priority,
'user_priority': user_priority,
'resources': resources,
'submitting_task': getattr(thread_state, 'key', None),
'retries': retries,
'fifo_timeout': fifo_timeout})

...and dumps_task calls warn_dumps (L816) without overriding the dumps=pickle.dumps argument:

if istask(task):
if task[0] is apply and not any(map(_maybe_complex, task[2:])):
d = {'function': dumps_function(task[1]),
'args': warn_dumps(task[2])}
if len(task) == 4:
d['kwargs'] = warn_dumps(task[3])
return d
elif not any(map(_maybe_complex, task[1:])):
return {'function': dumps_function(task[0]),
'args': warn_dumps(task[1:])}
return to_serialize(task)

def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
""" Dump an object to bytes, warn if those bytes are large """
b = dumps(obj)
if not _warn_dumps_warned[0] and len(b) > limit:
_warn_dumps_warned[0] = True
s = str(obj)
if len(s) > 70:
s = s[:50] + ' ... ' + s[-15:]
warnings.warn("Large object of size %s detected in task graph: \n"
" %s\n"
"Consider scattering large objects ahead of time\n"
"with client.scatter to reduce scheduler burden and \n"
"keep data on workers\n\n"
" future = client.submit(func, big_data) # bad\n\n"
" big_future = client.scatter(big_data) # good\n"
" future = client.submit(func, big_future) # good"
% (format_bytes(len(b)), s))
return b

@dhirschfeld
Copy link
Contributor Author

I've been poking around in the debugger but I don't think I know the codebase well enough to suggest the correct fix.

In my poking about I also found that scattering any object which has registered custom serialization fails with a long traceback:

AttributeError: 'Serialized' object has no attribute '__traceback__'
  File "C:/Users/dhirschf/.PyCharm2018.1/config/scratches/scratch_35.py", line 69, in 
    fut = client.scatter(obj)
  File "C:\Miniconda3\lib\site-packages\distributed\client.py", line 1772, in scatter
    asynchronous=asynchronous, hash=hash)
  File "C:\Miniconda3\lib\site-packages\distributed\client.py", line 652, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "C:\Miniconda3\lib\site-packages\distributed\utils.py", line 271, in sync
    six.reraise(*error[0])
  File "C:\Miniconda3\lib\site-packages\six.py", line 693, in reraise
    raise value
  File "C:\Miniconda3\lib\site-packages\distributed\utils.py", line 256, in f
    result[0] = yield future
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Miniconda3\lib\site-packages\distributed\client.py", line 1640, in _scatter
    timeout=timeout)
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Miniconda3\lib\site-packages\distributed\core.py", line 552, in send_recv_from_rpc
    result = yield send_recv(comm=comm, op=key, **kwargs)
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "C:\Miniconda3\lib\site-packages\distributed\core.py", line 446, in send_recv
    six.reraise(*clean_exception(**response))
  File "C:\Miniconda3\lib\site-packages\six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "C:\Miniconda3\lib\site-packages\distributed\core.py", line 321, in handle_comm
    result = yield result
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Miniconda3\lib\site-packages\distributed\scheduler.py", line 2155, in scatter
    report=False)
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Miniconda3\lib\site-packages\distributed\utils_comm.py", line 126, in scatter_to_workers
    for address, v in d.items()])
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Miniconda3\lib\site-packages\distributed\utils.py", line 208, in All
    result = yield tasks.next()
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Miniconda3\lib\site-packages\distributed\core.py", line 614, in send_recv_from_rpc
    result = yield send_recv(comm=comm, op=key, **kwargs)
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "C:\Miniconda3\lib\site-packages\tornado\gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "C:\Miniconda3\lib\site-packages\distributed\core.py", line 446, in send_recv
    six.reraise(*clean_exception(**response))
  File "C:\Miniconda3\lib\site-packages\six.py", line 691, in reraise
    if value.__traceback__ is not tb:
AttributeError: 'Serialized' object has no attribute '__traceback__'

@mrocklin
Copy link
Member

mrocklin commented Jul 12, 2018 via email

@dhirschfeld
Copy link
Contributor Author

I can try that but it doesn't seem quite right as the custom serializers are never called in the first place - the exception happens before anything is even sent to the workers (AFAICS)

The call stack is:

Client.submit
Client._graph_to_futures
worker.dumps_task
worker.warn_dumps
pickle.dumps  <-- BOOM!

Anyway, will look at it with fresh eyes in the morning. Thanks for the help/suggestions so far!

@mrocklin
Copy link
Member

mrocklin commented Jul 12, 2018 via email

@dhirschfeld
Copy link
Contributor Author

That code works fine - I've seen that in my poking around. The problem is that code path is never run - pickle.dumps is explicitly called from warn_dumps; serialize.dask_dumps is never called.

@mrocklin
Copy link
Member

mrocklin commented Jul 12, 2018 via email

@dhirschfeld
Copy link
Contributor Author

warn_dumps called with no dump argument therefore pickle.dumps is used which then blows up:

'args': warn_dumps(task[1:])}

def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
""" Dump an object to bytes, warn if those bytes are large """
b = dumps(obj)

@dhirschfeld
Copy link
Contributor Author

Jumping up the stack dumps_task is called from Client._graph_to_futures

'tasks': valmap(dumps_task, dsk3),

...which in turn is called directly from Client.submit
futures = self._graph_to_futures(dsk, [skey], restrictions,
loose_restrictions, priority={skey: 0},
user_priority=priority,
resources={skey: resources} if resources else None,
retries=retries,
fifo_timeout=fifo_timeout)

@dhirschfeld
Copy link
Contributor Author

Is serialize.dask_dumps called if you send something like a Numpy array?

I've never seen the custom serialization code-path actually taken

@mrocklin
Copy link
Member

mrocklin commented Jul 12, 2018 via email

@dhirschfeld
Copy link
Contributor Author

Tried the scatter with no joy - #2103 (comment)

@mrocklin
Copy link
Member

mrocklin commented Jul 12, 2018 via email

@mrocklin
Copy link
Member

I recommend that you create a PR with the arrow implementation and a small failing test. Then I'll take a look and see if I can suggest something?

@dhirschfeld
Copy link
Contributor Author

The numpy array goes through the pickle path too:

image

Will put in a PR in the morning and we can continue the discussion there. Thanks again for looking into it...

dhirschfeld pushed a commit to dhirschfeld/distributed that referenced this issue Jul 13, 2018
dhirschfeld pushed a commit to dhirschfeld/distributed that referenced this issue Jul 13, 2018
@dhirschfeld
Copy link
Contributor Author

Closing at this has been implemented in #2115 and further discussion can take place in #2110

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants