Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed 2021.3.1 distributed.protocol.serialize.pickle_loads fails with IndexError: tuple index out of range #4645

Closed
gabicca opened this issue Mar 29, 2021 · 43 comments

Comments

@gabicca
Copy link

gabicca commented Mar 29, 2021

What happened:

The following exception occred with the latest version of distributed, in a test that has previously passed:

header = {'compression': (None, None), 'num-sub-frames': 2, 'serializer': 'pickle', 'split-num-sub-frames': (1, 1), ...}
frames = [<memory at 0x1209deae0>, <memory at 0x1209dea10>]
    def pickle_loads(header, frames):
        x, buffers = frames[0], frames[1:]
        writeable = header["writeable"]
        for i in range(len(buffers)):
            mv = memoryview(buffers[i])
>           if writeable[i] == mv.readonly:
E           IndexError: tuple index out of range

"writeable" is an empty tuple in the above header.

What you expected to happen:

After digging a bit and comparing runs of the same test between 2021.3.0 and 2021.3.1, I found the following:

In version 2021.3.0
the input frames always has one element, hence buffers is always an empty list --> so the for loop, which contains writeable[i] never runs; writable is always an empty tuple

In version 2021.3.1
the third time it gets to this function, frames has 2 elements, hence buffers is not empty, and the for loop is executed; writable is still an empty tuple, hence code fails.

I saw that there were substantial changes to distributed.protocol.core.loads, where frames is passed down in its "truncated" from (sub_frames) to the function which eventually breaks. I don't know if this is a bug introduced, or our code needs changing. I'm not familiar with the underlying mechanisms, so I'd appreciate if someone could take a look.

Environment:

  • Dask version: 2021.3.1
  • Python version: 3.7.10
  • Operating System: MacOS Mojave (but also fails on linux-based gitlab runners)
  • Install method (conda, pip, source): pip
@jrbourbeau
Copy link
Member

Thanks for raising an issue @gabicca. cc @jakirkham @madsbk

Are you able to provide a code snippet which reproduces the IndexError?

@gabicca
Copy link
Author

gabicca commented Mar 29, 2021

Thanks for raising an issue @gabicca. cc @jakirkham @madsbk

Are you able to provide a code snippet which reproduces the IndexError?

I don't really understand the code to be honest, to get a working quick example. But I will spend a bit more time on it tomorrow and try to come up with one. But please don't wait for me with this. I'll let you know how it goes.

@jakirkham
Copy link
Member

Are you able to run with pdb @gabicca? It would be interesting to know what writeable is here

@gabicca
Copy link
Author

gabicca commented Mar 30, 2021

Are you able to run with pdb @gabicca? It would be interesting to know what writeable is here

Hi @jakirkham , writeabel is always an empty tuple, as I say in the description, hence the index error. The difference between the two executions, from what I can tell, was that while in the old version frames is always a single-element list, in the new version sometimes it has multiple elements. So in the first virsion the code never enterred the for loop, because buffers was an empty list. While in the new version, it enterred the loop and failed with writeable being empty.

@gabicca
Copy link
Author

gabicca commented Mar 30, 2021

I can't get a simple example together. I'm happy to hop on a call with someone if we can share connection details privately somehow.

@alejandrofiel
Copy link

alejandrofiel commented Mar 30, 2021

Hi, the same error here with this code. rollbacking to 2021.3.0 solves the problem

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=39, threads_per_worker=1)
client = Client(cluster)

ddf = dd.read_csv(f's3://xxxxxx/*.csv',
                      parse_dates=['test123'],
                      assume_missing=True,
                      storage_options={'config_kwargs': {'max_pool_connections': 20}}

df = ddf.compute()

Thanks!

edit:
I'll add this is similar to this: #4625
And the closer diff I saw was here: #4595

@Cedric-Magnan
Copy link

Cedric-Magnan commented Mar 31, 2021

Hello, the same error happens when dask distributed reads multiple parquet files with the same schema (but without any parquet metadata, I haven't tried with parquet metadata).

The following code works well in dask-2021.3.1 and dask-2021.3.0 :

import dask.dataframe as dd

ddf = dd.read_parquet(f's3://xxxxxx/*.parquet.gzip')
df = ddf.compute()

The following code works in dask-2021.2.0 but not in dask-2021.3.1 and dask-2021.3.0 (because both versions install distributed-2021.3.1) :

from dask.distributed import Client
import dask.dataframe as dd

client = Client()
ddf = dd.read_parquet(f's3://xxxxxx/*.parquet.gzip')
df = ddf.compute()

@mrocklin
Copy link
Member

Hi Everyone,

Thank you for reporting this. We'll get a fix in soon and issue a bugfix release.

However, it would be really helpful to develop a test here to ensure that this problem doesn't recur in the future. To the extent that people are able to reduce their examples to make them more reproducible by others that would be welcome. None of the test cases in the current test suite run into this problem, and so we're a bit blind at the moment.

For example, all of the stated examples talk about reading from S3. Does this problem occur if you're not reading data from S3? Does it require gzip? Does it require all of the keyword parameters that you're passing in? Does it go away if you remove a specific one of these? @Cedric-Magnan @alejandrofiel @gabicca you all are currently the best placed people to help us identify the challenge here. If you are able to reduce your problem to something that someone else can run that would be very helpful.

Given the information that you've provided so far I've tried to reproduce this issue by reading a CSV data from a public S3 dataset. This is the code that I use for that.

from dask.distributed import Client
client = Client()

df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
    storage_options={"anon": True},
)
df.head()

Does this fail for you by any chance?

  • If so, great, can you share more about your environment to help us reproduce your setup. Maybe a pip freeze or conda list output. If you have the time to craft a minimal environment then even better
  • If not, then are you able to try to figure out what is different between what you're doing in your failing example and what this example is doing? Is it the dataset? Some keyword argument?

Thanks all

@mrocklin
Copy link
Member

@williamBlazing I see that you've also reported something similar. If you or your team are able to help provide a reproducer that would be welcome.

@wmalpica
Copy link

@williamBlazing I see that you've also reported something similar. If you or your team are able to help provide a reproducer that would be welcome.

@mrocklin our reproducer is here dask/dask#7490

@wmalpica
Copy link

wmalpica commented Mar 31, 2021

To add a little more detail: I think there is something about the data that makes it crash, because we do similar things in our code all the time, but it was with this particular data that we get it to fail. At the same time, its not related to the parquet files in our test data, because we observed the same issue when we read the parquet file using cudf, or when that data was generated as an output of BSQL. Here is a reproducer that we have for when I thought the issue was in dask-cudf rapidsai/cudf#7773

@jakirkham
Copy link
Member

jakirkham commented Mar 31, 2021

We can manufacture a situation that causes the same error. Maybe this is good enough for testing purposes?

import numpy as np
from distributed.protocol import Serialized, dumps, loads, serialize


a = np.arange(5)

h, f = serialize(a)
h["writeable"] = []  # <--- will cause the error

dumps(Serialized(h, f))
loads(dumps(Serialized(h, f)))

@mrocklin
Copy link
Member

Yeah, I think that it would be helpful to find a test that was triggered from user API, rather than a whitebox test that was highly sensitive to our particular implementation

@mrocklin
Copy link
Member

Also, if people have the time, can you verify that versions of distributed are identical across machines? This bug might be the result of a subtle difference in distributed version.

client.get_versions(check=True)

@Cedric-Magnan
Copy link

Cedric-Magnan commented Mar 31, 2021

@mrocklin If that can be of any use, I have found an open dataset on s3 that can be used to reproduce the issue:

from dask.distributed import Client
import dask.dataframe as dd

client = Client()
df = dd.read_parquet("s3://sra-pub-sars-cov2-metadata-us-east-1/v2/tax_analysis/", storage_options={"anon": True})
df.compute()

I'm using pip with the following library versions:

zict==2.0.0
tblib==1.7.0
sortedcontainers==2.3.0
cloudpickle==1.6.0
click==7.1.2
dask==2021.3.1
dask-glm==0.2.0
dask-ml==1.8.0
tornado==6.1
toolz==0.11.1
psutil==5.8.0
msgpack==1.0.2
partd==1.1.0
fsspec==0.8.5
distributed==2021.3.1
numpy==1.20.1
pandas==1.2.3

I'm using distributed on a single machine (Red Hat Enterprise Linux Server 7.9) so this issue should probably not come from having different library versions on multiple machines.
Tell me if you still have issues reproducing this bug.

Edit : I am using Python 3.7.9 and installing in a virtualenv with pip.

@quasiben
Copy link
Member

quasiben commented Mar 31, 2021

@Cedric-Magnan I tried your reproduce but I am seeing a different issue:

/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/dataframe.py:5: FutureWarning: pandas.core.index is deprecated and will be removed in a future version.  The public classes are available in the top-level namespace.
  from pandas.core.index import CategoricalIndex, RangeIndex, Index, MultiIndex
conda list s3fs
/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/dataframe.py:5: FutureWarning: pandas.core.index is deprecated and will be removed in a future version.  The public classes are available in the top-level namespace.
  from pandas.core.index import CategoricalIndex, RangeIndex, Index, MultiIndex
/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/dataframe.py:5: FutureWarning: pandas.core.index is deprecated and will be removed in a future version.  The public classes are available in the top-level namespace.
  from pandas.core.index import CategoricalIndex, RangeIndex, Index, MultiIndex
/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/dataframe.py:5: FutureWarning: pandas.core.index is deprecated and will be removed in a future version.  The public classes are available in the top-level namespace.
  from pandas.core.index import CategoricalIndex, RangeIndex, Index, MultiIndex
/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/dataframe.py:5: FutureWarning: pandas.core.index is deprecated and will be removed in a future version.  The public classes are available in the top-level namespace.
  from pandas.core.index import CategoricalIndex, RangeIndex, Index, MultiIndex
distributed.worker - WARNING - Compute Failed
Function:  read_parquet_part
args:      (<s3fs.core.S3FileSystem object at 0x7fd957f64610>, <bound method FastParquetEngine.read_partition of <class 'dask.dataframe.io.parquet.fastparquet.FastParquetEngine'>>, Empty DataFrame
Columns: [run, contig, tax_id, rank, name, total_count, self_count, ilevel, ileft, iright]
Index: [], [(('sra-pub-sars-cov2-metadata-us-east-1/v2/tax_analysis/20210331_153810_00090_jyprn_4ea21fad-9c72-4787-bac3-7e79084250ee', [0]), {})], ['run', 'contig', 'tax_id', 'rank', 'name', 'total_count', 'self_count', 'ilevel', 'ileft', 'iright'], None, {'categories': {}})
kwargs:    {}
Exception: TypeError("expected dtype object, got 'numpy.dtype[uint32]'")

Traceback (most recent call last):
  File "test-4645.py", line 8, in <module>
    df.compute()
  File "/Users/bzaitlen/Documents/GitHub/dask/dask/base.py", line 284, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/bzaitlen/Documents/GitHub/dask/dask/base.py", line 566, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/bzaitlen/Documents/GitHub/distributed/distributed/client.py", line 2666, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/Users/bzaitlen/Documents/GitHub/distributed/distributed/client.py", line 1981, in gather
    asynchronous=asynchronous,
  File "/Users/bzaitlen/Documents/GitHub/distributed/distributed/client.py", line 844, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/Users/bzaitlen/Documents/GitHub/distributed/distributed/utils.py", line 353, in sync
    raise exc.with_traceback(tb)
  File "/Users/bzaitlen/Documents/GitHub/distributed/distributed/utils.py", line 336, in f
    result[0] = yield future
  File "/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/Users/bzaitlen/Documents/GitHub/distributed/distributed/client.py", line 1840, in _gather
    raise exception.with_traceback(traceback)
  File "/Users/bzaitlen/Documents/GitHub/dask/dask/dataframe/io/parquet/core.py", line 383, in read_parquet_part
    for (rg, kw) in part
  File "/Users/bzaitlen/Documents/GitHub/dask/dask/dataframe/io/parquet/core.py", line 383, in <listcomp>
    for (rg, kw) in part
  File "/Users/bzaitlen/Documents/GitHub/dask/dask/dataframe/io/parquet/fastparquet.py", line 793, in read_partition
    **kwargs.get("read", {}),
  File "/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/api.py", line 241, in read_row_group_file
    assign=assign, scheme=self.file_scheme)
  File "/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/core.py", line 305, in read_row_group_file
    scheme=scheme)
  File "/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/core.py", line 354, in read_row_group
    cats, selfmade, assign=assign)
  File "/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/core.py", line 331, in read_row_group_arrays
    catdef=out.get(name+'-catdef', None))
  File "/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/core.py", line 245, in read_col
    skip_nulls, selfmade=selfmade)
  File "/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/core.py", line 110, in read_data_page
    definition_levels, num_nulls = read_def(io_obj, daph, helper, metadata)
  File "/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/core.py", line 63, in read_def
    daph.num_values, bit_width)[:daph.num_values]
  File "/Users/bzaitlen/miniconda3/envs/dask-dev/lib/python3.7/site-packages/fastparquet/core.py", line 44, in read_data
    encoding.read_rle_bit_packed_hybrid(fobj, bit_width, o=o)
TypeError: expected dtype object, got 'numpy.dtype[uint32]'

EDIT: this is with the tip of main

@jakirkham
Copy link
Member

NumPy made some significant changes to types in 1.20. It might be worth checking with NumPy 1.19

@Cedric-Magnan
Copy link

Cedric-Magnan commented Mar 31, 2021

@Cedric-Magnan I tried your reproduce but I am seeing a different issue:

...

EDIT: this is with the tip of main

@quasiben I am using numpy 1.20.1, maybe you have a different version

@jakirkham
Copy link
Member

IIUC Ben was referring to using Dask + Distributed coming from main (not NumPy). The NumPy comment was in relation to the new issue Ben found. Would suggest trying Dask + Distributed from main (instead of the last tag), Cedric

@alejandrofiel
Copy link

alejandrofiel commented Mar 31, 2021

Hi! I'll add, that this does not fail

ddf = dd.read_csv(f's3://xxxxxx/*.csv',
                      parse_dates=['test123'],
                      assume_missing=True,
                      storage_options={'config_kwargs': {'max_pool_connections': 20}}
ddf.head()

I have the error on:
df = ddf.compute()

numpy 1.19.5
pandas 0.25.3

@jrbourbeau
Copy link
Member

Thanks @alejandrofiel, however since others can't access the CSV files you're using, this makes it difficult for us to debug. See https://blog.dask.org/2018/02/28/minimal-bug-reports for some information on crafting minimal bug reports

@Cedric-Magnan
Copy link

Cedric-Magnan commented Mar 31, 2021

IIUC Ben was referring to using Dask + Distributed coming from main (not NumPy). The NumPy comment was in relation to the new issue Ben found. Would suggest trying Dask + Distributed from main (instead of the last tag), Cedric

Indeed, that's a good idea. I have a different issue when running this code with the current main branch on commit faf0a70.

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 105, in loads
    frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 97, in _decode_default
    sub_header, sub_frames, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 450, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 384, in deserialize
    return loads(header, frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 80, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
_pickle.UnpicklingError: pickle data was truncated
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/cedric/venv/lib/python3.7/site-packages/dask/base.py", line 283, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/dask/base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 2666, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1981, in gather
    asynchronous=asynchronous,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 844, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 353, in sync
    raise exc.with_traceback(tb)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 336, in f
    result[0] = yield future
  File "/home/cedric/venv/lib/python3.7/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1869, in _gather
    response = await future
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1920, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils_comm.py", line 390, in retry_operation
    operation=operation,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/core.py", line 862, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/core.py", line 645, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/tcp.py", line 222, in read
    allow_offload=self.allow_offload,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/utils.py", line 77, in from_frames
    res = await offload(_from_frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 1440, in offload
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/usr/local/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 1440, in <lambda>
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/utils.py", line 63, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 105, in loads
    frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 97, in _decode_default
    sub_header, sub_frames, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 450, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 384, in deserialize
    return loads(header, frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 80, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
_pickle.UnpicklingError: pickle data was truncated

The same code I have written in my previous message works fine with dask and distributed 2021.2.0.

@jakirkham
Copy link
Member

Thanks for the update. What does new contain in this part of the exception?

  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 80, in pickle_loads
    return pickle.loads(x, buffers=new)

@Cedric-Magnan
Copy link

Thanks for the update. What does new contain in this part of the exception?

  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 80, in pickle_loads
    return pickle.loads(x, buffers=new)

I have printed new during execution and my console showed [] before raising the error. The pickle_loads method was not called before this.

@jakirkham
Copy link
Member

Could you please give PR ( #4659 ) a try?

@Cedric-Magnan
Copy link

Could you please give PR ( #4659 ) a try?

I have another issue when using distributed from your fork :

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 105, in loads
    frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 97, in _decode_default
    sub_header, sub_frames, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 455, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 389, in deserialize
    return loads(header, frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 85, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x, buffers=buffers)
TypeError: 'buffers' is an invalid keyword argument for loads()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/cedric/venv/lib/python3.7/site-packages/dask/base.py", line 283, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/dask/base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 2666, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1981, in gather
    asynchronous=asynchronous,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 844, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 353, in sync
    raise exc.with_traceback(tb)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 336, in f
    result[0] = yield future
  File "/home/cedric/venv/lib/python3.7/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1869, in _gather
    response = await future
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1920, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils_comm.py", line 390, in retry_operation
    operation=operation,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/core.py", line 862, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/core.py", line 645, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/tcp.py", line 222, in read
    allow_offload=self.allow_offload,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/utils.py", line 77, in from_frames
    res = await offload(_from_frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 1440, in offload
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/usr/local/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 1440, in <lambda>
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/utils.py", line 63, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 105, in loads
    frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 97, in _decode_default
    sub_header, sub_frames, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 455, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 389, in deserialize
    return loads(header, frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 85, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x, buffers=buffers)
TypeError: 'buffers' is an invalid keyword argument for loads()

I have seen a similar issue on #3851

@jakirkham
Copy link
Member

Are you on Python 3.7? If so, do you have pickle5 installed? Also is pickle5 available for all of the Workers, Client(s), and Scheduler?

@Cedric-Magnan
Copy link

Cedric-Magnan commented Mar 31, 2021

Are you on Python 3.7? If so, do you have pickle5 installed? Also is pickle5 available for all of the Workers, Client(s), and Scheduler?

I have just installed pickle5, I still have the following traceback:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 105, in loads
    frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 97, in _decode_default
    sub_header, sub_frames, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 455, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 389, in deserialize
    return loads(header, frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 85, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x, buffers=buffers)
_pickle.UnpicklingError: pickle data was truncated
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/cedric/venv/lib/python3.7/site-packages/dask/base.py", line 283, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/dask/base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 2666, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1981, in gather
    asynchronous=asynchronous,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 844, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 353, in sync
    raise exc.with_traceback(tb)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 336, in f
    result[0] = yield future
  File "/home/cedric/venv/lib/python3.7/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1869, in _gather
    response = await future
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1920, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils_comm.py", line 390, in retry_operation
    operation=operation,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/core.py", line 862, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/core.py", line 645, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/tcp.py", line 222, in read
    allow_offload=self.allow_offload,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/utils.py", line 77, in from_frames
    res = await offload(_from_frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 1440, in offload
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/usr/local/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 1440, in <lambda>
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/utils.py", line 63, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 105, in loads
    frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 97, in _decode_default
    sub_header, sub_frames, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 455, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 389, in deserialize
    return loads(header, frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 85, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x, buffers=buffers)
_pickle.UnpicklingError: pickle data was truncated

The value of new is now [<memory at 0x7fe656b0aef0>, <memory at 0x7fe654066530>, <memory at 0x7fe654066600>, <memory at 0x7fe6540666d0>, <memory at 0x7fe6540667a0>, <memory at 0x7fe654066870>]

Question : If pickle5 is required for Python 3.7 to work for dask distributed, shouldn't it be added to the install requirements of the distributed library ? Or do you plan to remove support for Python 3.7 soon ?

Also, I am using distributed on a single machine, using a dask.distributed.Client with a LocalCluster that is setup during the execution so there should not be any library differences between scheduler, client and workers.

@jakirkham
Copy link
Member

jakirkham commented Mar 31, 2021

Well pickle5 isn't required and we don't plan to drop Python 3.7 yet AFAIK. The problem that seems to have occurred in the previous case is at least one Worker or Client had access to pickle5. So it was used to pickle the data. However wherever that data was sent to (be it Worker, Client, or Scheduler) did not. As a result it encountered data it was unable to parse. Hence the error message. So it isn't an issue of pickle5 being required. Instead it is an issue of having discrepancies between the different environments where Dask is being run

Should add we did add logic before to check whether the receiver has access to pickle protocol 5 before sending data using that protocol ( #4019 ). However given the error seen, am guessing there is some point where this check doesn't work. Anyways this is a totally different problem and would recommend filing a new issue on it

@jakirkham
Copy link
Member

Interesting thanks for rerunning. So that looks like a different issue where not all data is making it across the wire for some reason. Not sure why that is. Would suggest filing that as a new issue with a minimal reproducer so we can track that separately

@jakirkham
Copy link
Member

Would be good if a few more people on this thread test with main and report back. We've put a couple of fixes in there today

@gabicca
Copy link
Author

gabicca commented Apr 1, 2021

I'm trying to catch up with all this (a lot happened overnight :) )

When I run the test again with the latest master, I get the same pickle error that @Cedric-Magnan gets. In my case, new contains a couple of memory addresses.

So to answer @mrocklin and @jakirkham a few things:

  • we don't load data from S3. We generate test_images from FITS files locally
  • we don't pickle ourselves. If any pickleing is happening, that's through dask I suppose. So I don't understand why we'd need pickle5 if we only use python3.7 for running the process.

If I install pickle5 into my environment, then the error changes to

    def loads(x, *, buffers=()):
        try:
            if buffers:
>               return pickle.loads(x, buffers=buffers)
E               _pickle.UnpicklingError: pickle data was truncated

Later I'll come back to this, there are some other tests I haven't looked at, they may give me something more reproducable that I can share with you. My problem is that the input we use at the moment is very complex, and non-standard, so can't share it easily.

@jakirkham
Copy link
Member

Could you please test with Python 3.8?

@mrocklin
Copy link
Member

mrocklin commented Apr 1, 2021

With the following diff

diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py
index 50d87953..5f2cbfef 100644
--- a/distributed/protocol/serialize.py
+++ b/distributed/protocol/serialize.py
@@ -58,18 +58,22 @@ def pickle_dumps(x, context=None):
         "serializer": "pickle",
         "writeable": tuple(not f.readonly for f in frames[1:]),
     }
+    print("dumps", len(frames), list(map(len, frames)))
+
     return header, frames
 
 
 def pickle_loads(header, frames):
     x, buffers = frames[0], frames[1:]
+    print("loads", len(frames), list(map(len, frames)))
 
     writeable = header.get("writeable")
     if not writeable:
         writeable = len(buffers) * (None,)

And when we run the example provided by @williamBlazing

from dask.distributed import Client
import dask.dataframe
import pandas

def load_file(file_path):
    return pandas.read_parquet(file_path)


def main():
    # running dask-scheduler locally with two dask-worker
    with Client() as dask_client:

        workers = list(dask_client.scheduler_info()["workers"])
        futures = []
        files = ['deserialize_0.parquet', 'deserialize_1.parquet']
        counter = 0
        for worker in workers:
            futures.append(
                dask_client.submit(
                    load_file, files[counter],  workers=[worker],    pure=False,
                )
            )
            counter = counter+1
            if counter > 1:
                break

        for future in futures:
            future.result()

if __name__ == "__main__":
    main()

We get the following output

dumps 1 [58705154]
loads 1 [58705154]
dumps 1 [129780145]
loads 2 [67108864, 62671281]

We see that the first transmission sends and receives the same sized frames, but in the second transmission the frame is chopped in half. This commonly occurs when Dask tries to send a frame that it too large for an intermediate stage, like compression. Usually Dask comms do a good job of stitching these frames back together. Apparently this is no longer happening in the pickle5 case.

@mrocklin
Copy link
Member

mrocklin commented Apr 1, 2021

I'm going to use this as an opportunity to highlight the cost of adding in new technology. I'm excited about the advantages that pickle5 can bring us, but all new additions like this come at a cost that they, with something like 10-20% probability blow up later on and other people have to fix them. When I'm ornery or hesitant about adding in new whiz-bang features, this is why.

(I could also be wrong about the cause here, in which case I apologize in advance).

@jakirkham
Copy link
Member

Actually suspect an environment inconsistency. At this point am leaning towards it being a mix of Python versions being used (like a mix of 3.7 and 3.8). Hence the corruption. The reality being deployment is hard and this is just one instance

@gabicca
Copy link
Author

gabicca commented Apr 1, 2021

Could you please test with Python 3.8?

When I run the test on a python3.8 environemnt, I get this error:

    def loads(x, *, buffers=()):
        try:
            if buffers:
>               return pickle.loads(x, buffers=buffers)
E               _pickle.UnpicklingError: pickle data was truncated

@jakirkham
Copy link
Member

Thanks let's track this in issue ( #4662 ). It appears we've addressed the original issue and one variant ( #4645 (comment) )

@mrocklin
Copy link
Member

mrocklin commented Apr 1, 2021

Actually suspect an environment inconsistency. At this point am leaning towards it being a mix of Python versions being used (like a mix of 3.7 and 3.8).

I disagree. I'm reproducing this locally. I believe that the issue is splitting frames coming from pickle 5.

@mrocklin mrocklin reopened this Apr 1, 2021
@jakirkham
Copy link
Member

jakirkham commented Apr 1, 2021

Which? The problem at the top of the issue? Or this more recently splitting issue? The latter is filed in issue ( #4662 )

@mrocklin
Copy link
Member

mrocklin commented Apr 1, 2021

Ah, my apologies. Thank you.

@mrocklin mrocklin closed this as completed Apr 1, 2021
@jakirkham
Copy link
Member

For clarity I agree there is still an issue and do think we should fix before the release. That's what I'm trying to look at currently. Just trying to disentangle things to make them easier to track 🙂

@mrocklin
Copy link
Member

mrocklin commented Apr 2, 2021

Thank you everyone who participated in helping to track this down. I appreciate it.

rapids-bot bot pushed a commit to rapidsai/cudf that referenced this issue Apr 6, 2021
Needed to pick up some serialization bug fixes in the recent Distributed release ( dask/distributed#4645 ) ( dask/distributed#4662 )

Authors:
  - https://github.com/jakirkham

Approvers:
  - Keith Kraus (https://github.com/kkraus14)
  - Ray Douglass (https://github.com/raydouglass)

URL: #7858
shwina pushed a commit to shwina/cudf that referenced this issue Apr 7, 2021
Needed to pick up some serialization bug fixes in the recent Distributed release ( dask/distributed#4645 ) ( dask/distributed#4662 )

Authors:
  - https://github.com/jakirkham

Approvers:
  - Keith Kraus (https://github.com/kkraus14)
  - Ray Douglass (https://github.com/raydouglass)

URL: rapidsai#7858
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants