Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Training with ray DaskEngine to_parquet fails with TypeError: cannot pickle 'pickle5.PickleBuffer' object #1710

Closed
dantreiman opened this issue Jan 25, 2022 · 7 comments

Comments

@dantreiman
Copy link
Contributor

Reproducible by training any model , e.x. examples/titanic/simple_model_training.py

If I uninstall ray or otherwise force Ludwig to use PandasEngine, training works.

With ray and dask installed, Ludwig uses DaskEngine and fails with error:
TypeError: cannot pickle 'pickle5.PickleBuffer' object

Stack trace:

  File "/Users/daniel/Desktop/github/dantreiman-ludwig/ludwig/api.py", line 424, in train
    preprocessed_data = self.preprocess(
  File "/Users/daniel/Desktop/github/dantreiman-ludwig/ludwig/api.py", line 1268, in preprocess
    preprocessed_data = preprocess_for_training(
  File "/Users/daniel/Desktop/github/dantreiman-ludwig/ludwig/data/preprocessing.py", line 1433, in preprocess_for_training
    processed = cache.put(*processed)
  File "/Users/daniel/Desktop/github/dantreiman-ludwig/ludwig/data/cache/manager.py", line 46, in put
    training_set = self.dataset_manager.save(
  File "/Users/daniel/Desktop/github/dantreiman-ludwig/ludwig/data/dataset/ray.py", line 100, in save
    self.backend.df_engine.to_parquet(dataset, cache_path)
  File "/Users/daniel/Desktop/github/dantreiman-ludwig/ludwig/data/dataframe/dask.py", line 84, in to_parquet
    df.to_parquet(
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/dask/dataframe/core.py", line 4127, in to_parquet
    return to_parquet(self, path, *args, **kwargs)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py", line 671, in to_parquet
    out = out.compute(**compute_kwargs)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/dask/base.py", line 283, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/dask/base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/util/dask/scheduler.py", line 127, in ray_dask_get
    result = ray_get_unpack(object_refs, progress_bar_actor=pb_actor)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/util/dask/scheduler.py", line 420, in ray_get_unpack
    computed_result = get_result(object_refs)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/util/dask/scheduler.py", line 408, in get_result
    return ray.get(object_refs)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/worker.py", line 1713, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TypeError): ray::dask:to-parquet-7442830550491c434ff968e79a5f5f70 (pid=40584, ip=127.0.0.1)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::dask:('to-parquet-7442830550491c434ff968e79a5f5f70', 0) (pid=40584, ip=127.0.0.1)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/serialization.py", line 361, in serialize
    return self._serialize_to_msgpack(value)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/serialization.py", line 341, in _serialize_to_msgpack
    self._serialize_to_pickle5(metadata, python_objects)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/serialization.py", line 301, in _serialize_to_pickle5
    raise e
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/serialization.py", line 297, in _serialize_to_pickle5
    inband = pickle.dumps(
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/daniel/mambaforge/envs/ludwig39-dev/lib/python3.9/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 620, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'pickle5.PickleBuffer' object
@hungcs
Copy link
Contributor

hungcs commented Jan 25, 2022

Would you happen to know what engine dask is using to read parquet (fastparquet vs pyarrow)?

I ran into errors with pyarrow 5.0.0 and ludwig, but that was resolved after upgrading to pyarrow 6.0.1

@dantreiman
Copy link
Contributor Author

I have pyarrow==6.0.1 installed. No fastparquet. Just to be sure, I downgraded pyarrow to 5.0.0 and upgraded again - no change still getting the same bug.

@hungcs
Copy link
Contributor

hungcs commented Jan 25, 2022

@dantreiman What version of dask are you using? requirements_dask.txt has dask[dataframe]<2021.3.1 This constraint was because of ray, which fixed the issue, so we should be good to remove this anyways. On the pred side we've been using 2021.10.0

There seem to be pickle5 errors resolved with 2021.4.0 (although the error messages are different), might be worth a shot to upgrade
rapidsai/cudf#7858

@dantreiman
Copy link
Contributor Author

dantreiman commented Jan 25, 2022

My environment has dask==2022.01.0

I tried downgrading to dask 2021.4.0 and dask 2021.10.0 - no change. My ray version is 1.9.2

I think your original idea about pyarrow is on the right track. I changed dask.py:84 to

            df.to_parquet(
                path,
                engine="fastparquet",  # was "pyarrow"
                write_index=False,
                schema="infer",
            )

and this saves successfully. Perhaps I have an incompatible pair of dask and pyarrow versions installed.

@connor-mccorm
Copy link
Contributor

I'm experiencing this same issue when I run my tests. It is failing specifically at the from_dask() function in the ray dataset constructor. I feel like the issue may not be parquet related since I'm not writing to parquet at all and still getting this error.

@dantreiman
Copy link
Contributor Author

Apparently uninstalling pickle5 fixes this. pip uninstall pickle5

@dantreiman
Copy link
Contributor Author

Caused by: ray-project/ray#22562

Fixed by: #1763

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants