Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Category mismatch error when merging DFs on string keys #4565

Closed
taureandyernv opened this issue Mar 18, 2020 · 1 comment
Closed

[BUG] Category mismatch error when merging DFs on string keys #4565

taureandyernv opened this issue Mar 18, 2020 · 1 comment
Labels
bug Something isn't working

Comments

@taureandyernv
Copy link
Contributor

taureandyernv commented Mar 18, 2020

Describe the bug
when trying to merge two dataframes after type casting from str to category in dask_cudf and RMM, if one of the data frames is large and the other small, the merge fails with the error: TypeError: Left and right categories must be the same.

This happens on 0.13 3/17 nightlies. cudf works. dask_cudf without RMM works. Reading the strings directly as category works (but there are some problems with this way, which will be brought up in a subsequent issue)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-1-4e4d89298da2> in <module>
     44 ddf_b = dask_cudf.read_csv('db.csv', npartitions=2)
     45 ddf_b['key'] = ddf_b['key'].astype("category")
---> 46 ddf_merged = ddf_a.merge(ddf_b, on=['key'], how='left').compute()
     47 #ddf_merged = ddf_merged.reset_index()
     48 print(ddf_merged.sort_values('key'))

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    435     keys = [x.__dask_keys__() for x in collections]
    436     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437     results = schedule(dsk, keys, **kwargs)
    438     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    439 

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2593                     should_rejoin = False
   2594             try:
-> 2595                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2596             finally:
   2597                 for f in futures.values():

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1891                 direct=direct,
   1892                 local_worker=local_worker,
-> 1893                 asynchronous=asynchronous,
   1894             )
   1895 

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    778         else:
    779             return sync(
--> 780                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    781             )
    782 

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1750                             exc = CancelledError(key)
   1751                         else:
-> 1752                             raise exception.with_traceback(traceback)
   1753                         raise exc
   1754                     if errors == "skip":

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/dask/dataframe/multi.py in merge_chunk()
    216 def merge_chunk(lhs, *args, **kwargs):
    217     empty_index_dtype = kwargs.pop("empty_index_dtype", None)
--> 218     out = lhs.merge(*args, **kwargs)
    219     # Workaround pandas bug where if the output result of a merge operation is
    220     # an empty dataframe, the output index is `int64` in all cases, regardless

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/cudf/core/dataframe.py in merge()
   2343             rsuffix,
   2344             how,
-> 2345             method,
   2346         )
   2347 

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/cudf/core/frame.py in _merge()
    994         # potentially do an implicit typecast
    995         (lhs, rhs, to_categorical) = self._typecast_before_merge(
--> 996             lhs, rhs, left_on, right_on, left_index, right_index, how
    997         )
    998 

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/cudf/core/frame.py in _typecast_before_merge()
   1169                 continue
   1170 
-> 1171             to_dtype = casting_rules(lhs, rhs, dtype_l, dtype_r, how)
   1172 
   1173             if to_dtype is not None:

~/miniconda3/envs/rapids013-317/lib/python3.6/site-packages/cudf/core/frame.py in casting_rules()
   1096                 dtype_r
   1097             ):
-> 1098                 raise TypeError("Left and right categories must be the same.")
   1099             elif how == "left":
   1100 

TypeError: Left and right categories must be the same.

Steps/Code to reproduce bug

### Generate data and successfully complete reading and merge in cudf
import cudf
df_a = cudf.DataFrame()
n = 1000000
k = ["a", "b", "c", "d", "e"]
df_a['key'] = k * n
a = [float(i + 10) for i in range(5)]
df_a['vals_a'] = a * n
df_a.to_csv('da.csv')
print(df_a.count())

df_b = cudf.DataFrame()
df_b['key'] = ["a", "b", "d"]
df_b['vals_b'] = [float(i+10) for i in range(3)]
df_b.to_csv('db.csv')
print(df_b.count())

print(df_a['key'].head(20))

df_merged = df_a.merge(df_b, on=['key'])
print(df_merged.sort_values('key'))


from collections import OrderedDict
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.delayed import delayed
from dask.distributed import Client, wait

cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)  # Please change your n_workers amount to the number of GPUs you have
print(cluster)
client = Client(cluster)
client

def initialize_rmm_pool():
    import rmm

    rmm.reinitialize(pool_allocator=True)

def initialize_rmm_no_pool():
    import rmm

    rmm.reinitialize(pool_allocator=False)
    
client.run(initialize_rmm_pool)

###  Read data with dask_cudf
cols = [
        'keys', 'vals_a'
]
    
dtypes = OrderedDict([
        ("keys", "str"), # read in as string
        ("val_b", "float64")
])
ddf_a = dask_cudf.read_csv('da.csv', npartitions=2)
ddf_a['key'] = ddf_a['key'].astype("category") # changed to category

cols = [
        'keys', 'vals_b'
]
    
dtypes = OrderedDict([
        ("keys", "str"), # read in as string
        ("val_b", "float64")
])
ddf_b = dask_cudf.read_csv('db.csv', npartitions=2)
ddf_b['key'] = ddf_b['key'].astype("category") # changed to category
ddf_merged = ddf_a.merge(ddf_b, on=['key'], how='left').compute() # !!! It will break here
#ddf_merged = ddf_merged.reset_index()
print(ddf_merged.sort_values('key'))

Expected behavior

import cudf
df_a = cudf.DataFrame()
n = 1000000
k = ["a", "b", "c", "d", "e"]
df_a['key'] = k * n
a = [float(i + 10) for i in range(5)]
df_a['vals_a'] = a * n
df_a.to_csv('da.csv')
print(df_a.count())

df_b = cudf.DataFrame()
df_b['key'] = ["a", "b", "d"]
df_b['vals_b'] = [float(i+10) for i in range(3)]
df_b.to_csv('db.csv')
print(df_b.count())

print(df_a['key'].head(20))

df_merged = df_a.merge(df_b, on=['key'])
print(df_merged.sort_values('key'))


from collections import OrderedDict
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.delayed import delayed
from dask.distributed import Client, wait

cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)  # Please change your n_workers amount to the number of GPUs you have
print(cluster)
client = Client(cluster)
client

def initialize_rmm_pool():
    import rmm

    rmm.reinitialize(pool_allocator=True)

def initialize_rmm_no_pool():
    import rmm

    rmm.reinitialize(pool_allocator=False)
    
client.run(initialize_rmm_pool)

###  Read data with dask_cudf
cols = [
        'keys', 'vals_a'
]
    
dtypes = OrderedDict([
        ("keys", "category"),
        ("val_b", "float64")
])
ddf_a = dask_cudf.read_csv('da.csv', npartitions=2)

cols = [
        'keys', 'vals_b'
]
    
dtypes = OrderedDict([
        ("keys", "category"),
        ("val_b", "float64")
])
ddf_b = dask_cudf.read_csv('db.csv', npartitions=2)

ddf_merged = ddf_a.merge(ddf_b, on=['key'], how='left').compute()
#ddf_merged = ddf_merged.reset_index()
print(ddf_merged.sort_values('key'))

Output is similar to cudf...but still needs work

LocalCUDACluster('tcp://127.0.0.1:36725', workers=2, threads=2, memory=135.11 GB)
         Unnamed: 0_x key  vals_a Unnamed: 0_y vals_b
2                8580   a    10.0            0   10.0
5                8585   a    10.0            0   10.0
8                8590   a    10.0            0   10.0
11               8595   a    10.0            0   10.0
14               8600   a    10.0            0   10.0
...               ...  ..     ...          ...    ...
4999990       4999879   e    14.0         null   null
4999992       4999884   e    14.0         null   null
4999994       4999889   e    14.0         null   null
4999996       4999894   e    14.0         null   null
4999998       4999899   e    14.0         null   null

[5000000 rows x 5 columns]

Environment overview (please complete the following information)

  • Environment location: [Bare-metal]
  • Method of cuDF install: [conda]
    • If method of install is [Docker], provide docker pull & docker run commands used

Environment details
Please run and paste the output of the cudf/print_env.sh script here, to gather any other relevant environment details

Additional context
found while updating the mortgage notebook and trying to find out why dask_cudf wipes out my dataframes on a merge (it hashes the strings that were cast as categories- surprise!). i printed my output of

    print(acq_gdf.describe())
    print(names.describe())
    print(acq_gdf['seller_name'].unique())
    print(names['seller_name'].unique())

and the smaller one has a Length parameter and the larger one does not. Same error.

Name: seller_name, dtype: category
Categories (22, object): [AMTRUST BANK, BANK OF AMERICA, N.A., BISHOPS GATE RESIDENTIAL MORTGAGE TRUST, CITIMORTGAGE, INC., ..., SUNTRUST MORTGAGE INC., USAA FEDERAL SAVINGS BANK, WASHTENAW MORTGAGE COMPANY, WELLS FARGO BANK, N.A.]

Name: seller_name, Length: 79, dtype: category
Categories (79, object): [ACADEMY MORTGAGE CORPORATION, ALLY BANK, AMERIHOME MORTGAGE COMPANY|LLC, AMERISAVE MORTGAGE CORPORATION, ..., WELLS FARGO BANK| NA, WELLS FARGO BANK|N.A., WELLS FARGO BANK|NA, WELLS FARGO CREDIT RISK TRANSFER SECURITIES TR...]

@pentschev
@randerzander
@rnyak

@taureandyernv taureandyernv added Needs Triage Need team to review and classify bug Something isn't working labels Mar 18, 2020
@harrism harrism changed the title [BUG] Merging two dataframes on a string based column, with Dask_cudf and RMM, one big one small, gives a category mismatch error [BUG] Category mismatch error when merging DFs on string keys Mar 18, 2020
@kkraus14
Copy link
Collaborator

duplicate of #3496

@bdice bdice removed the Needs Triage Need team to review and classify label Mar 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants