Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST] Returning from multi-thread. TypeError: a bytes-like object is required, not 'dict' #15246

Closed
blue-cat-whale opened this issue Mar 7, 2024 · 5 comments · Fixed by #15940
Labels
doc Documentation question Further information is requested

Comments

@blue-cat-whale
Copy link

blue-cat-whale commented Mar 7, 2024

When running my code with cudf, I got TypeError: a bytes-like object is required, not 'dict' in the multi-thread returning part.

  1. Running the code without -m cudf.pandas option is fine.
  2. It's okay if each multi-thread branch returns merely a scalar.
  3. Program CRUSHES if a multi-thread branch returns a dataframe.

This is the code message:

concurrent.futures.process._RemoteTraceback:
'''
Traceback (most recent call last):
  File "/usr/lib64/python3.9/concurrent/futures/process.py", line 387, in wait_result_broken_or_wakeup
    result_item = result_reader.recv()
  File "/usr/lib64/python3.9/multiprocessing/connection.py", line 255, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/fast_slow_proxy.py", line 742, in __setstate__
    unpickled_wrapped_obj = pickle.loads(state)
TypeError: a bytes-like object is required, not 'dict'
'''

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib64/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib64/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/__main__.py", line 91, in <module>
    main()
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/__main__.py", line 87, in main
    runpy.run_path(args.args[0], run_name="__main__")
  File "/usr/lib64/python3.9/runpy.py", line 288, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/usr/lib64/python3.9/runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/usr/lib64/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "clean_header.py", line 48, in <module>
    main()
  File "clean_header.py", line 45, in main
    my_func()
  File "clean_header.py", line 39, in my_func
    for obj in r:
  File "/usr/lib64/python3.9/concurrent/futures/process.py", line 562, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/usr/lib64/python3.9/concurrent/futures/_base.py", line 609, in result_iterator
    yield fs.pop().result()
  File "/usr/lib64/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/lib64/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Here is my code.

from datetime import datetime, timedelta, date
import numpy as np
import pandas as pd
from random import randint
import swifter
import json, sys, os
from cudf.pandas.module_accelerator import disable_module_accelerator

from functools import partial
from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import set_start_method


def data_generation(nRows: int):
################## unimportant, for reproducing purpose ###################
# This function generates the dataframe obj, which has 5 columns, and the data are sorted by WorkingDay and Minute ascendingly
    my_df = pd.DataFrame(data={'WorkingDay': ['2019-01-02', '2018-01-02', '2019-05-02', '2020-01-02', '2021-01-02'], 'name': ['albert', 'alex', 'alice', 'ben', 'bob'], 'Minute': ['09:00:00', '09:20:00', '08:00:00', '07:00:00', '09:30:00'], 'aaa': np.random.rand(5), 'bbb': np.    random.rand(5)})
    my_df = pd.concat([my_df for i in range(int(nRows/5))], axis=0)
    my_df['WorkingDay'] = my_df['WorkingDay'].map(lambda x: (date(randint(2010,2020), randint(1,4), randint(1,5))).strftime('%Y-%m-%d'))
    my_df['Minute'] = np.random.permutation(my_df['Minute'].values)
    my_df = my_df.sort_values(by=['WorkingDay', 'Minute'], inplace=False).reset_index(drop=True,inplace=False)
    return my_df

def my_func_single(branchIndex: int):
    my_df = data_generation(20-5*branchIndex)
# data generated
#############################################################################
    # The multi-thread return is problematic
#############################################################################
    #return my_df.shape[0]
    return my_df


def my_func():
    set_start_method('spawn')
    my_func_partial = partial(my_func_single)
    with Pool(max_workers=2) as pool:
        r = pool.map(my_func_partial, range(4))
    for obj in r:
        #print('df has length: {}.'.format(obj))
        print('df has length: {}.'.format(obj.shape[0]))

def main():
    print('-------------------- program starts -----------------------')
    my_func()

if __name__ == '__main__':
    main()

Relevant dependencies:

cuda-python==12.4.0
cudf-cu12==24.4.0a516
cugraph-cu12==24.4.0a69
cuml-cu12==24.4.0a37
dask==2024.1.1
dask-cuda==24.4.0a11
dask-cudf-cu12==24.4.0a516
pylibcugraph-cu12==24.4.0a69
pylibraft-cu12==24.4.0a70
@blue-cat-whale blue-cat-whale added the question Further information is requested label Mar 7, 2024
@blue-cat-whale
Copy link
Author

blue-cat-whale commented Mar 7, 2024

I tried another parallel mechanism and a similar error appers.

The new code:

def my_func():
  num_cores = 2
  inputs = range(4)
  results = Parallel(n_jobs=num_cores)(delayed(my_func_single)(i) for i in inputs)
  for obj in results:   
    print('df has length: {}.'.format(obj.shape[0]))

def main():
  print('-------------------- program starts -----------------------')
  my_func()  

if __name__ == '__main__':
  main()

The error message:

joblib.externals.loky.process_executor._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/local/lib64/python3.9/site-packages/joblib/externals/loky/process_executor.py", line 661, in wait_result_broken_or_wakeup
    result_item = result_reader.recv()
  File "/usr/lib64/python3.9/multiprocessing/connection.py", line 255, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/fast_slow_proxy.py", line 742, in __setstate__
    unpickled_wrapped_obj = pickle.loads(state)
TypeError: a bytes-like object is required, not 'dict'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib64/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib64/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/__main__.py", line 91, in <module>
    main()
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/__main__.py", line 87, in main
    runpy.run_path(args.args[0], run_name="__main__")
  File "/usr/lib64/python3.9/runpy.py", line 288, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/usr/lib64/python3.9/runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/usr/lib64/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "clean_header.py", line 49, in <module>
    main()
  File "clean_header.py", line 45, in main
    my_func()
  File "clean_header.py", line 38, in my_func
    results = Parallel(n_jobs=num_cores)(delayed(my_func_single)(i) for i in inputs)
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 1952, in __call__
    return output if self.return_generator else list(output)
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 1595, in _get_outputs
    yield from self._retrieve()
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 1699, in _retrieve
    self._raise_error_fast()
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 1734, in _raise_error_fast
    error_job.get_result(self.timeout)
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 736, in get_result
    return self._return_or_raise()
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 754, in _return_or_raise
    raise self._result
joblib.externals.loky.process_executor.BrokenProcessPool: A result has failed to un-serialize. Please ensure that the objects returned by the function are always picklable.

@blue-cat-whale
Copy link
Author

I use return my_df.values, list(my_df.index) to bypass this problem.

@vyasr
Copy link
Contributor

vyasr commented Mar 7, 2024

I'm glad you were able to get the issue resolved in your case! That said, it does look like you're highlighting a real issue with using cudf.pandas objects in multiprocessing, so I'm going to reopen this issue for now. Here's a MWE for future investigation indicating that it's also sensitive to how the process is created. Since fork works while spawn does not, we're probably relying on some implicit state being preserved that is lost when a new process is spawned.

# Works correctly for `import cudf as pd`
import pandas as pd

from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import set_start_method


def f(i: int):
    return pd.DataFrame({'a': [i]})


def main():
    for method in ['fork', 'spawn', 'forkserver']:
        set_start_method(method, force=True)
        with Pool(max_workers=2) as pool:
            r = pool.map(f, range(4))
        try:
            list(r)
        except Exception as e:
            print(f'{type(e).__name__}: {method}')
        else:
            print(f'Succeeded: {method}')

if __name__ == '__main__':
    main()

@vyasr vyasr reopened this Mar 7, 2024
@wence-
Copy link
Contributor

wence- commented Mar 11, 2024

This problem exhibits because when using spawn or forkserver, the new python process that is started by multiprocessing does not have the custom cudf.pandas metapath finder installed. Hence, the import of pandas as import pandas as pd fetches the real (unwrapped) pandas module, rather than the wrapped (cudf.pandas) module.

Consider:

import sys
from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import set_start_method

def f():
    print(sys.meta_path)

def main():
    for method in ['fork', 'spawn', 'forkserver']:
        print(method)
        set_start_method(method, force=True)
        with Pool(max_workers=1) as pool:
            result = pool.submit(f).result()

if __name__ == "__main__":
    main()

When run with python -m cudf.pandas bug.py:

fork
[ModuleAccelerator(fast=cudf, slow=pandas), <_distutils_hack.DistutilsMetaFinder object at 0x76f18b8991e0>, <_rmm_editable.ScikitBuildRedirectingFinder object at 0x76f18ba67fa0>, <_cudf_kafka_editable.ScikitBuildRedirectingFinder object at 0x76f18ba64700>, <_cudf_editable.ScikitBuildRedirectingFinder object at 0x76f18bb2b3d0>, <class '_frozen_importlib.BuiltinImporter'>, <class '_frozen_importlib.FrozenImporter'>, <class '_frozen_importlib_external.PathFinder'>, <six._SixMetaPathImporter object at 0x76f04651b4c0>]
 ^^^^^^^^^^^^^^^^^^^ Good!
spawn
[<_distutils_hack.DistutilsMetaFinder object at 0x78af5ec412d0>, <_rmm_editable.ScikitBuildRedirectingFinder object at 0x78af5ec405b0>, <_cudf_kafka_editable.ScikitBuildRedirectingFinder object at 0x78af5ee0c7f0>, <_cudf_editable.ScikitBuildRedirectingFinder object at 0x78af5eed74c0>, <class '_frozen_importlib.BuiltinImporter'>, <class '_frozen_importlib.FrozenImporter'>, <class '_frozen_importlib_external.PathFinder'>]
 ^ BAD!
forkserver
[<_distutils_hack.DistutilsMetaFinder object at 0x7c5cd58e92a0>, <_rmm_editable.ScikitBuildRedirectingFinder object at 0x7c5cd58e8580>, <_cudf_kafka_editable.ScikitBuildRedirectingFinder object at 0x7c5cd58a47c0>, <_cudf_editable.ScikitBuildRedirectingFinder object at 0x7c5cd596f490>, <class '_frozen_importlib.BuiltinImporter'>, <class '_frozen_importlib.FrozenImporter'>, <class '_frozen_importlib_external.PathFinder'>]

The way one can work around this is to use the functional interface to cudf.pandas and install manually at the start of the file. Note that this must be done before an import of pandas. So:

import cudf.pandas
cudf.pandas.install()

import pandas as pd
from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import set_start_method


def f(i: int):
    return pd.DataFrame({'a': [i]})


def main():
    for method in ['fork', 'spawn', 'forkserver']:
        set_start_method(method, force=True)
        with Pool(max_workers=2) as pool:
            r = pool.map(f, range(4))
        try:
            list(r)
        except Exception as e:
            print(f'{type(e).__name__}: {method}')
        else:
            print(f'Succeeded: {method}')

if __name__ == '__main__':
    main()

Will work for all three cases.

@wence-
Copy link
Contributor

wence- commented Mar 11, 2024

We should probably add this as a known limitation in the FAQ.

@wence- wence- added the doc Documentation label Mar 11, 2024
wence- added a commit to wence-/cudf that referenced this issue Jun 6, 2024
We need to arrange that cudf.pandas.install() is run on the workers,
this requires that we programmatically install the metapath loader in
our script. Unfortunately, passing an initializer function to the pool
startup is not sufficient if any part of the script transitively loads
pandas at the top level.

- Closes rapidsai#15246
rapids-bot bot pushed a commit that referenced this issue Jun 6, 2024
We need to arrange that cudf.pandas.install() is run on the workers, this requires that we programmatically install the metapath loader in our script. Unfortunately, passing an initializer function to the pool startup is not sufficient if any part of the script transitively loads pandas at the top level.

- Closes #15246

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #15940
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc Documentation question Further information is requested
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants