-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dd.read_parquet + compute raises _pickle.UnpicklingError: pickle data was truncated
for multiple parquet files
#4662
Comments
Just to be clear I don't think |
Also linking this example that Matt shared ( #4645 (comment) ) |
Also this was tested with Python 3.8 and found to still be an issue ( #4645 (comment) ). So it is not unique to Python 3.7 + Without |
I'm blocked for the next few hours, but the next thing I recommend doing, if anyone has the time, is to construct a test. I tried this with the following: @gen_cluster(client=True)
async def test_large_pandas_dataframe(c, s, a, b):
pd = pytest.importorskip("pandas")
w = c.submit(np.arange, 20000000)
x = c.submit(pd.DataFrame, {"x": w}, workers=a.address)
y = c.submit(inc, x, workers=b.address)
await y But I learned that the frames that were coming out of Pandas had length 1, and so maybe weren't being cut up. I think that this is maybe because they're blocks in a pandas block manager. For some reason the blocks coming out of the example dataset from @williamblazing posted in dask/dask#7490 have a much longer length. Maybe this is a dtype difference? The next thing I would try is to construct a Pandas dataframe that we can send through my test above that reproduces this error. My guess is that this will require a different dtype or categorical or something. Once that's done I'm hoping that we can put breakpoints strategically in the serialization pipeline to find out where things are being cut, and why they're not being stitched back together. |
Yeah I've been trying NumPy arrays and Am still trying to wrap my head around where the issue is coming from. ATM am thinking pickle protocol 5 is a red-herring (especially since this issue shows up in 2021.3.1 and not 2021.3.0 suggesting this related to other serialization changes). Instead am thinking some of our logic around splitting and joining frames may be having issues. Though have yet to narrow that down |
Yeah, I think that the thing to do here is to start with the data the William provided, which we know fails, and then try to simplify and reduce that down. There are pretty solid tests and special code paths around numpy arrays. I'm fairly confident about that path |
Ok have found a simpler reproducer import numpy as np
from dask.distributed import Client, wait
class Data:
def __init__(self, n):
self.data = np.empty(n, dtype="u1")
c = Client(n_workers=1)
f = c.submit(Data, 200_000_000)
wait(f)
d = f.result() |
Testing a roundtrip through import numpy as np
from distributed.protocol import Serialize, dumps, loads
class Data:
def __init__(self, n):
self.data = np.empty(n, dtype="u1")
d = Data(200_000_000)
loads(dumps(Serialize(d))) |
I'm tracking this down to the code that handles frames in msgpack now. |
Yeah, ok, so the _encode_default and _decode_default functions in dumps and loads in protocol/core.py create a new convention where they split and then compress frames, and record the number of sub-frames under a new "num-sub-frames" key. Unfortunately, in the case where they don't immediately deserialize, but instead create a cc @madsbk |
Resolved I think in #4666 Tests are running now. |
Yeah was also digging into the MsgPack serialization code, but hadn't gotten quite that far. That makes sense. Thanks for digging deeper 🙂 FWIW Mads is out this week. I believe he get back next week, but am not sure |
Ok this should now be fixed. Please retest and let us know how it goes. Also it should be possible to test without cc @alejandrofiel @Cedric-Magnan @gabicca @williamBlazing |
I've just ran a few tests that were failing before and now they pass with the latest code on main, on python3.7! Thank everyone for all the effort you put into fixing this!! :) |
I've tested with the code I had provided to read parquet files with distributed from the current main branch and it seems to work as well ! Congrats and thanks for the effort ! (It also works without pickle5) |
Great, thank you all for testing! 😄 |
Needed to pick up some serialization bug fixes in the recent Distributed release ( dask/distributed#4645 ) ( dask/distributed#4662 ) Authors: - https://github.com/jakirkham Approvers: - Keith Kraus (https://github.com/kkraus14) - Ray Douglass (https://github.com/raydouglass) URL: #7858
Needed to pick up some serialization bug fixes in the recent Distributed release ( dask/distributed#4645 ) ( dask/distributed#4662 ) Authors: - https://github.com/jakirkham Approvers: - Keith Kraus (https://github.com/kkraus14) - Ray Douglass (https://github.com/raydouglass) URL: rapidsai#7858
What happened:
When trying to compute multiple parquet files into a single pandas dataframe with a distributed client on a single VM, I get the following traceback :
Traceback
What you expected to happen:
I would expect no issue since the parquet files have the same schema (they don't have any parquet metadata though).
The same process worked fine on distributed 2021.2.0 and 2021.3.0
Minimal Complete Verifiable Example:
Install a new virtualenv with Python 3.7.9, install the following libraries with pip:
requirements.txt :
Then run the following python script :
Anything else we need to know?:
Here is the result from the following command:
Result
This issue was discovered trying to debug #4645.
@jakirkham has raised something interesting. He said that, using Python 3.7, distributed needs
pickle5
on all the workers, the scheduler and the client(s). Here, pickle5 is installed when the LocalCluster is being created but I am not sure it is installed on all workers, the scheduler and the client(s) sincepickle5
is neither in the install requirements for Python3.7, nor in the packages from the workers when I check usingclient.get_versions
(or at least, it does not appear in the packages available).Environment:
The text was updated successfully, but these errors were encountered: