Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Basic LIMIT statements require getitem calls on each partition if table is persisted (but not otherwise) #833

Closed
beckernick opened this issue Oct 3, 2022 · 1 comment · Fixed by #837
Labels
bug Something isn't working python Affects Python API

Comments

@beckernick
Copy link

Basic LIMIT statements require getitem calls on each partition if table is persisted, but not if the table is not persisted. There's no difference in the RAL for each scenario, so I'm sharing screenshots of the task graph.

from dask_sql import Context
from dask.distributed import Client, LocalCluster
import pandas as pd
import numpy as np

cluster = LocalCluster()
client = Client(cluster)

c = Context()

n = 10000000
k = 10
df = pd.DataFrame({f"c{x}": np.arange(n) for x in range(k)})
df.to_csv("temp.csv", index=False)
c.create_table(
    'df',
    'temp.csv',
    blocksize="5 MB",
)

query = """
SELECT *
FROM df
LIMIT 5
"""
print(c.explain(query))
c.sql(query).compute()
Limit: skip=0, fetch=5
  Projection: #df.c0, #df.c1, #df.c2, #df.c3, #df.c4, #df.c5, #df.c6, #df.c7, #df.c8, #df.c9
    TableScan: df projection=[c0, c1, c2, c3, c4, c5, c6, c7, c8, c9], fetch=5

limit-task-stream-no-persist

When persisted, the task stream (after the read_csv ops) looks quite different.

c.create_table(
    'df_persist',
    'temp.csv',
    blocksize="5 MB",
    persist=True
)
query = """
SELECT *
FROM df_persist
LIMIT 5
"""
print(c.explain(query))
c.sql(query).compute()
Limit: skip=0, fetch=5
  Projection: #df_persist.c0, #df_persist.c1, #df_persist.c2, #df_persist.c3, #df_persist.c4, #df_persist.c5, #df_persist.c6, #df_persist.c7, #df_persist.c8, #df_persist.c9
    TableScan: df_persist projection=[c0, c1, c2, c3, c4, c5, c6, c7, c8, c9], fetch=5

limit-task-stream-persisted

@beckernick beckernick added bug Something isn't working needs triage Awaiting triage by a dask-sql maintainer labels Oct 3, 2022
@ayushdg ayushdg added python Affects Python API and removed needs triage Awaiting triage by a dask-sql maintainer labels Oct 4, 2022
@ayushdg
Copy link
Collaborator

ayushdg commented Oct 4, 2022

Thanks for raising the issue.

For some context, dask-sql has a special codepath that computes only the first partition length to check if it has enough rows to cover the limit clause and return a result. This codepath is limited to simpler task graphs (blockwise and IO operations in dask or explicitly via sql.limit.check-first-partition config added in #696) since this check can be expensive and duplicate compute for complex graphs including operations such as joins.

For all other cases we default to computing df.head with all partitions npartitions=-1 ref.

For the persisted case we can add a check to the same fast codepath mentioned above. The cost for this check is negligible in persisted dataframes while from the task graph it does seem like there is a overhead for computing head across multiple partitions even if it's persisted in memory.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working python Affects Python API
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants