Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Support merge b/w categorical dask cudf dataframes #8200

Closed
VibhuJawa opened this issue May 10, 2021 · 0 comments · Fixed by #8332
Closed

[FEA] Support merge b/w categorical dask cudf dataframes #8200

VibhuJawa opened this issue May 10, 2021 · 0 comments · Fixed by #8332
Assignees
Labels
dask Dask issue feature request New feature or request

Comments

@VibhuJawa
Copy link
Member

Is your feature request related to a problem? Please describe.

We should support merging of categorical columns with dask cudf dataframes.

We currently fail with NotImplementedError: cudf.Categorical is not yet implemented.

Describe the solution you'd like

import dask_cudf
import cudf
from dask.dataframe.categorical import categorize

df_1 = cudf.DataFrame({'id_1':[0,1,2,3],'cat_col':['a','b','f','f']})
ddf_1 = dask_cudf.from_cudf(df_1, npartitions=2)
ddf_1 = categorize(ddf_1,columns=['cat_col'])
df_2 = cudf.DataFrame({'id_2':[111,112,113],'cat_col':['g','h','f']})
ddf_2 = dask_cudf.from_cudf(df_2, npartitions=2)
ddf_2= categorize(ddf_2,columns=['cat_col'])
print(ddf_1.merge(ddf_2).compute())
NotImplementedError                       Traceback (most recent call last)
<ipython-input-1-f5bdf3b5ebd7> in <module>
     13 
     14 
---> 15 print(ddf_1.merge(ddf_2).compute())

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    282         dask.base.compute
    283         """
--> 284         (result,) = compute(self, traverse=False, **kwargs)
    285         return result
    286 
/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    564         postcomputes.append(x.__dask_postcompute__())
    565 
--> 566     results = schedule(dsk, keys, **kwargs)
    567     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    568 
/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    558     """
    559     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 560     return get_async(
    561         synchronous_executor.submit,
    562         synchronous_executor._max_workers,

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    501             while state["waiting"] or state["ready"] or state["running"]:
    502                 fire_tasks(chunksize)
--> 503                 for key, res_info, failed in queue_get(queue).result():
    504                     if failed:
    505                         exc, tb = loads(res_info)

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433 
    434             self._condition.wait(timeout)

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/local.py in submit(self, fn, *args, **kwargs)
    543         fut = Future()
    544         try:
--> 545             fut.set_result(fn(*args, **kwargs))
    546         except BaseException as e:
    547             fut.set_exception(e)

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/local.py in batch_execute_tasks(it)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238 
    239 
/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/local.py in <listcomp>(.0)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238 
    239 

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    226         failed = False
    227     except BaseException as e:
--> 228         result = pack_exception(e, dumps)
    229         failed = True
    230     return key, result, failed

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/utils.py in apply(func, args, kwargs)
     32 def apply(func, args, kwargs=None):
     33     if kwargs:
---> 34         return func(*args, **kwargs)
     35     else:
     36         return func(*args)

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/dask/dataframe/multi.py in merge_chunk(lhs, *args, **kwargs)
    263             if left is not None and right is not None:
    264                 dtype = union_categoricals(
--> 265                     [left.astype("category").values, right.astype("category").values]
    266                 ).dtype
    267 

/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/cudf/core/frame.py in values(self)
   3378         <class 'cupy.core.core.ndarray'>
   3379         """
-> 3380         return self._column.values
   3381 
   3382     @property
/nvme/0/vjawa/conda/envs/rapids-0.20/lib/python3.8/site-packages/cudf/core/column/categorical.py in values(self)
   1128         Return a CuPy representation of the CategoricalColumn.
   1129         """
-> 1130         raise NotImplementedError("cudf.Categorical is not yet implemented")
   1131 
   1132     def clip(self, lo: ScalarLike, hi: ScalarLike) -> "column.ColumnBase":

NotImplementedError: cudf.Categorical is not yet implemented

CC: @galipremsagar , @randerzander .

@VibhuJawa VibhuJawa added feature request New feature or request Needs Triage Need team to review and classify dask Dask issue labels May 10, 2021
@VibhuJawa VibhuJawa added Python Affects Python cuDF API. and removed Needs Triage Need team to review and classify Python Affects Python cuDF API. labels May 10, 2021
@galipremsagar galipremsagar self-assigned this May 19, 2021
rapids-bot bot pushed a commit that referenced this issue May 27, 2021
Fixes: #8200 
This PR adds support for merging b/w categorical data by implementing `union_categoricals_dispatch` in `dask-cudf`. This PR is dependent on dask upstream changes: dask/dask#7699

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Keith Kraus (https://github.com/kkraus14)
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Ashwin Srinath (https://github.com/shwina)

URL: #8332
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dask Dask issue feature request New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants