Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Getting error when jointly encoding single-hot and multi-hot categ columns #1639

Closed
rnyak opened this issue Aug 1, 2022 · 4 comments
Closed
Assignees
Labels
bug Something isn't working P1

Comments

@rnyak
Copy link
Contributor

rnyak commented Aug 1, 2022

Describe the bug
I would like to jointly encode single and multi-hot categorical columns but I am getting the following error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [46], in <cell line: 16>()
     13 train_dataset = nvt.Dataset(train)
     15 workflow = nvt.Workflow(cat_features)
---> 16 workflow.fit_transform(train_dataset)

File /usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py:286, in Workflow.fit_transform(self, dataset)
    266 def fit_transform(self, dataset: Dataset) -> Dataset:
    267     """Convenience method to both fit the workflow and transform the dataset in a single
    268     call. Equivalent to calling ``workflow.fit(dataset)`` followed by
    269     ``workflow.transform(dataset)``
   (...)
    284     transform
    285     """
--> 286     self.fit(dataset)
    287     return self.transform(dataset)

File /usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py:247, in Workflow.fit(self, dataset)
    245     results = [r.result() for r in dask_client.compute(stats)]
    246 else:
--> 247     results = dask.compute(stats, scheduler="synchronous")[0]
    249 for computed_stats, op in zip(results, ops):
    250     op.fit_finalize(computed_stats)

File /usr/local/lib/python3.8/dist-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    572     keys.append(x.__dask_keys__())
    573     postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
    576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/local/lib/python3.8/dist-packages/dask/local.py:554, in get_sync(dsk, keys, **kwargs)
    549 """A naive synchronous version of get_async
    550 
    551 Can be useful for debugging.
    552 """
    553 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 554 return get_async(
    555     synchronous_executor.submit,
    556     synchronous_executor._max_workers,
    557     dsk,
    558     keys,
    559     **kwargs,
    560 )

File /usr/local/lib/python3.8/dist-packages/dask/local.py:497, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    495 while state["waiting"] or state["ready"] or state["running"]:
    496     fire_tasks(chunksize)
--> 497     for key, res_info, failed in queue_get(queue).result():
    498         if failed:
    499             exc, tb = loads(res_info)

File /usr/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /usr/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File /usr/local/lib/python3.8/dist-packages/dask/local.py:539, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    537 fut = Future()
    538 try:
--> 539     fut.set_result(fn(*args, **kwargs))
    540 except BaseException as e:
    541     fut.set_exception(e)

File /usr/local/lib/python3.8/dist-packages/dask/local.py:235, in batch_execute_tasks(it)
    231 def batch_execute_tasks(it):
    232     """
    233     Batch computing of multiple tasks with `execute_task`
    234     """
--> 235     return [execute_task(*a) for a in it]

File /usr/local/lib/python3.8/dist-packages/dask/local.py:235, in <listcomp>(.0)
    231 def batch_execute_tasks(it):
    232     """
    233     Batch computing of multiple tasks with `execute_task`
    234     """
--> 235     return [execute_task(*a) for a in it]

File /usr/local/lib/python3.8/dist-packages/dask/local.py:226, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    224     failed = False
    225 except BaseException as e:
--> 226     result = pack_exception(e, dumps)
    227     failed = True
    228 return key, result, failed

File /usr/local/lib/python3.8/dist-packages/dask/local.py:221, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    219 try:
    220     task, data = loads(task_info)
--> 221     result = _execute_task(task, data)
    222     id = get_id()
    223     result = dumps((result, id))

File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
     98 @wraps(func)
     99 def inner(*args, **kwargs):
    100     libnvtx_push_range(self.attributes, self.domain.handle)
--> 101     result = func(*args, **kwargs)
    102     libnvtx_pop_range(self.domain.handle)
    103     return result

File /usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py:761, in _top_level_groupby(df, options)
    759     df_gb = type(df)()
    760     ignore_index = True
--> 761     df_gb[cat_col_selector_str] = _concat(
    762         [df[col] for col in cat_col_selector.names], ignore_index
    763     )
    764     cat_col_selector = ColumnSelector([cat_col_selector_str])
    765 else:
    766     # Compile aggregation dictionary and add "squared-sum"
    767     # column(s) (necessary when `agg_cols` is non-empty)

File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:119, in _concat(args, ignore_index)
    111 # We filter out empty partitions here because pandas frequently has
    112 # inconsistent dtypes in results between empty and non-empty frames.
    113 # Ideally this would be handled locally for each operation, but in practice
    114 # this seems easier. TODO: don't do this.
    115 args2 = [i for i in args if len(i)]
    116 return (
    117     args[0]
    118     if not args2
--> 119     else methods.concat(args2, uniform=True, ignore_index=ignore_index)
    120 )

File /usr/local/lib/python3.8/dist-packages/dask/dataframe/dispatch.py:60, in concat(dfs, axis, join, uniform, filter_warning, ignore_index, **kwargs)
     58 else:
     59     func = concat_dispatch.dispatch(type(dfs[0]))
---> 60     return func(
     61         dfs,
     62         axis=axis,
     63         join=join,
     64         uniform=uniform,
     65         filter_warning=filter_warning,
     66         ignore_index=ignore_index,
     67         **kwargs,
     68     )

File /usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
     98 @wraps(func)
     99 def inner(*args, **kwargs):
    100     libnvtx_push_range(self.attributes, self.domain.handle)
--> 101     result = func(*args, **kwargs)
    102     libnvtx_pop_range(self.domain.handle)
    103     return result

File /usr/local/lib/python3.8/dist-packages/dask_cudf/backends.py:272, in concat_cudf(dfs, axis, join, uniform, filter_warning, sort, ignore_index, **kwargs)
    267 if ignore_order:
    268     raise NotImplementedError(
    269         "ignore_order parameter is not yet supported in dask-cudf"
    270     )
--> 272 return cudf.concat(dfs, axis=axis, ignore_index=ignore_index)

File /usr/local/lib/python3.8/dist-packages/cudf/core/reshape.py:407, in concat(objs, axis, join, ignore_index, sort)
    405         return objs[0]
    406     else:
--> 407         return cudf.Series._concat(
    408             objs, axis=axis, index=None if ignore_index else True
    409         )
    410 elif typ is cudf.MultiIndex:
    411     return cudf.MultiIndex._concat(objs)

File /usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
     98 @wraps(func)
     99 def inner(*args, **kwargs):
    100     libnvtx_push_range(self.attributes, self.domain.handle)
--> 101     result = func(*args, **kwargs)
    102     libnvtx_pop_range(self.domain.handle)
    103     return result

File /usr/local/lib/python3.8/dist-packages/cudf/core/series.py:1292, in Series._concat(cls, objs, axis, index)
   1286             raise TypeError(
   1287                 "cudf does not support mixed types, please type-cast "
   1288                 "both series to same dtypes."
   1289             )
   1291     if dtype_mismatch:
-> 1292         common_dtype = find_common_type([obj.dtype for obj in objs])
   1293         objs = [obj.astype(common_dtype) for obj in objs]
   1295 col = concat_columns([o._column for o in objs])

File /usr/local/lib/python3.8/dist-packages/cudf/utils/dtypes.py:574, in find_common_type(dtypes)
    571     dtypes = dtypes - dt_dtypes
    572     dtypes.add(np.result_type(*dt_dtypes))
--> 574 td_dtypes = set(
    575     filter(lambda t: pd.api.types.is_timedelta64_dtype(t), dtypes)
    576 )
    577 if len(td_dtypes) > 0:
    578     dtypes = dtypes - td_dtypes

File /usr/local/lib/python3.8/dist-packages/cudf/utils/dtypes.py:575, in find_common_type.<locals>.<lambda>(t)
    571     dtypes = dtypes - dt_dtypes
    572     dtypes.add(np.result_type(*dt_dtypes))
    574 td_dtypes = set(
--> 575     filter(lambda t: pd.api.types.is_timedelta64_dtype(t), dtypes)
    576 )
    577 if len(td_dtypes) > 0:
    578     dtypes = dtypes - td_dtypes

File /usr/local/lib/python3.8/dist-packages/pandas/core/dtypes/common.py:419, in is_timedelta64_dtype(arr_or_dtype)
    415 if isinstance(arr_or_dtype, np.dtype):
    416     # GH#33400 fastpath for dtype object
    417     return arr_or_dtype.kind == "m"
--> 419 return _is_dtype_type(arr_or_dtype, classes(np.timedelta64))

File /usr/local/lib/python3.8/dist-packages/pandas/core/dtypes/common.py:1619, in _is_dtype_type(arr_or_dtype, condition)
   1615         return condition(type(None))
   1617     return False
-> 1619 return condition(tipo)

File /usr/local/lib/python3.8/dist-packages/pandas/core/dtypes/common.py:146, in classes.<locals>.<lambda>(tipo)
    144 def classes(*klasses) -> Callable:
    145     """evaluate if the tipo is a subclass of the klasses"""
--> 146     return lambda tipo: issubclass(tipo, klasses)

TypeError: issubclass() arg 1 must be a class

`

Steps/Code to reproduce bug

You can run the code below to repro the error:

import cudf
import nvtabular as nvt
train = cudf.DataFrame(
    {
        "C1": [1, 3, 3, 4, 3, 1] *2,
        "C2": [10, 11, 12, 10, 11, 12] *2,
        "C3": [[1, 3], [1, 5], [4, 2, 1], [1, 2, 3], [1], [3,4]] *2,
    }
)

cat_features = [["C1", "C3"], "C2"] >> nvt.ops.Categorify()

train_dataset = nvt.Dataset(train)

workflow = nvt.Workflow(cat_features)
workflow.fit_transform(train_dataset)

Expected behavior
A clear and concise description of what you expected to happen.

Environment details (please complete the following information):

  • Environment location: [Bare-metal, Docker, Cloud(specify cloud provider)]
  • Method of NVTabular install: [conda, Docker, or from source]
    • If method of install is [Docker], provide docker pull & docker run commands used

I am using merlin-tensorflow:22.06 container with the latest branches pulled from all libraries.

@rnyak rnyak added bug Something isn't working P1 S3 severity level labels Aug 1, 2022
@karlhigley
Copy link
Contributor

karlhigley commented Aug 2, 2022

@rnyak rnyak removed the S3 severity level label Oct 27, 2022
@rjzamora
Copy link
Collaborator

I am currently getting the following behavior:

In [1]: import cudf
   ...: import nvtabular as nvt
   ...: train = cudf.DataFrame(
   ...:     {
   ...:         "C1": [1, 3, 3, 4, 3, 1] *2,
   ...:         "C2": [10, 11, 12, 10, 11, 12] *2,
   ...:         "C3": [[1, 3], [1, 5], [4, 2, 1], [1, 2, 3], [1], [3,4]] *2,
   ...:     }
   ...: )
   ...: 
   ...: cat_features = [["C1", "C3"], "C2"] >> nvt.ops.Categorify()
   ...: 
   ...: train_dataset = nvt.Dataset(train)
   ...: 
   ...: workflow = nvt.Workflow(cat_features)
   ...: workflow.fit_transform(train_dataset).compute()
Out[1]: 
    C2  C1         C3
0    1   1     [1, 2]
1    2   2     [1, 5]
2    3   2  [3, 4, 1]
3    1   3  [1, 4, 2]
4    2   2        [1]
5    3   1     [2, 3]
6    1   1     [1, 2]
7    2   2     [1, 5]
8    3   2  [3, 4, 1]
9    1   3  [1, 4, 2]
10   2   2        [1]
11   3   1     [2, 3]

I will close the issue, because this result looks correct to me. However, feel free to re-open if there is still a bug that I am missing (cc @rnyak @karlhigley)

@oliverholworthy
Copy link
Member

For future reference, this was fixed by #1685 . And available in the 22.10 Merlin release. (NVTabular 1.6.0)

@oliverholworthy
Copy link
Member

I've opened an issue in cudf here rapidsai/cudf#12083 in case the cudf.concat issue is a bug in cudf. Even if the functionality of concat of list and scalar series is intended to be unsupported, it may be that we could raise a clearer error message in that scenario to help people using cudf identify their error more quickly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P1
Projects
None yet
Development

No branches or pull requests

4 participants