Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Overhead in sorting single partition frames with dask_cudf #3873

Closed
VibhuJawa opened this issue Jan 22, 2020 · 3 comments · Fixed by #4000
Closed

[BUG] Overhead in sorting single partition frames with dask_cudf #3873

VibhuJawa opened this issue Jan 22, 2020 · 3 comments · Fixed by #4000
Labels
bug Something isn't working dask Dask issue

Comments

@VibhuJawa
Copy link
Member

VibhuJawa commented Jan 22, 2020

Describe the bug
There seems to be overhead in sorting single partition frames with dask vs map_partitions vs cudf.

Pure Dask- 4.509349584579468
Map Partitions: 1.7103350162
Pure Cudf: 1.4036340713500977

Steps/Code to reproduce bug

Create Helper

def create_random_data(n_rows=1_000,n_keys_index_1=50_000,n_keys_index_2=20_000,n_keys_index_3=20_000):
    
    df = dd.concat([
        da.random.random(n_rows).to_dask_dataframe(columns='x'),
        da.random.randint(0, n_keys_index_1, size=n_rows).to_dask_dataframe(columns='index_1'),
        da.random.randint(0, n_keys_index_2, size=n_rows).to_dask_dataframe(columns='index_2'),
        da.random.randint(0, n_keys_index_3, size=n_rows).to_dask_dataframe(columns='index_3'),
        
    ], axis=1).persist()
    gdf = df.map_partitions(cudf.from_pandas)
    return gdf.persist()

Native Dask

df = create_random_data(80_000_000)
df = df.repartition(npartitions=1).persist()
_ = wait(df)

st = time.time()
df = df.sort_values(by =['index_1','index_2', 'index_3']).persist()
_ = wait(df)
et = time.time()
print(f"time taken = {et-st} , len of df = {len(df):,}")
del df
time taken = 4.4830780029296875 , len of df = 80,000,000

Map Partition

df = create_random_data(80_000_000)
df = df.repartition(npartitions=1).persist()
_ = wait(df)

st = time.time()
df = df.map_partitions(lambda df:df.sort_values(by =['index_1','index_2', 'index_3'])).persist()
_ = wait(df)
et = time.time()
print(f"time taken = {et-st} , len of df = {len(df):,}")
del df
time taken = 1.7191593647003174 , len of df = 80,000,000

Cudf

df = create_random_data(80_000_000)
df = df.compute()

st = time.time()
df = df.sort_values(by =['index_1','index_2', 'index_3'])
_ = wait(df)
et = time.time()
print(f"time taken = {et-st} , len of df = {len(df):,}")
del df
time taken = 1.4099068641662598 , len of df = 80,000,000

Expected behavior
I would expect similar behavior.

Environment details

# packages in environment at /raid/vjawa/conda/conda_installation/envs/cudf_22_jan:
cudf                      0.12.0b200122         py37_1748    rapidsai-nightly
dask-cudf                 0.12.0b200122         py37_1748    rapidsai-nightly
libcudf                   0.12.0b200122     cuda10.0_1748    rapidsai-nightly

Gist Link: https://gist.github.com/VibhuJawa/236b073b8e1b1243ad33a099503cdc36

@VibhuJawa VibhuJawa added bug Something isn't working Needs Triage Need team to review and classify labels Jan 22, 2020
@kkraus14 kkraus14 added dask Dask issue and removed Needs Triage Need team to review and classify labels Jan 22, 2020
@randerzander randerzander changed the title [BUG] Overhead in sorting single parition frames with dask_cudf [BUG] Overhead in sorting single partition frames with dask_cudf Jan 28, 2020
@randerzander
Copy link
Contributor

This may be a duplicate of #2272 depending on how that issue is resolved.

@rjzamora
Copy link
Member

Thanks for raising this @VibhuJawa - I can definitely add a simple fix here to avoid unnecessary work in the case that there is only a single partition. You are correct that there is no reason for the dask_cudf.DataFrame.sort_values API to underperform map_partitions in the "trivial" case.

Other context/info: I have spent some time today working on an experimental version of sort_values that is following a similar procedure as set_index in upstream dask. The challenge with completely replacing the current implementation is that the set_index-like version can only use a single column to perform the repartitioning (multiple columns can be used for intra-partition sorting, but only the first column can be used for assigning the partition). This is a problem if the data in the first column is highly skewed, but the algorithm seems significantly more performant in other cases.

@VibhuJawa
Copy link
Member Author

Thanks for raising this @VibhuJawa - I can definitely add a simple fix here to avoid unnecessary work in the case that there is only a single partition. You are correct that there is no reason for the dask_cudf.DataFrame.sort_values API to underperform map_partitions in the "trivial" case.

Other context/info: I have spent some time today working on an experimental version of sort_values that is following a similar procedure as set_index in upstream dask. The challenge with completely replacing the current implementation is that the set_index-like version can only use a single column to perform the repartitioning (multiple columns can be used for intra-partition sorting, but only the first column can be used for assigning the partition). This is a problem if the data in the first column is highly skewed, but the algorithm seems significantly more performant in other cases.

I think for set-index we need only one column for our cases so any improvement there will be a amazing as it will unblock multiple workflows.

For multi-column sort, i don't think we do these on super big frames, so we can play with repartitioning to see if that helps performance there.

I will start a thread on our internal slack because it might be easier to discuss our workflows there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working dask Dask issue
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants