Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correlated subqueries #683

Conversation

sarahyurick
Copy link
Collaborator

@sarahyurick sarahyurick commented Aug 11, 2022

Previously, we would get a ValueError: Not all divisions are known, can't align partitions. Please use set_index to set the index. for something like:

from dask_sql import Context
import dask.dataframe as dd
import pandas as pd

c = Context()

names = ["Miracle", "Sunshine", "Pretty woman", "Handsome man", "Barbie", "Cool painting", "Black square #1000", "Mountains"]
prices = [300, 700, 2800, 2300, 250, 5000, 50, 1300]
ids = [11, 12, 13, 14, 15, 16, 17, 18]
artist_id = [1, 1, 2, 2, 3, 3, 3, 4]
paintings = dd.from_pandas(pd.DataFrame({"id": ids, "name": names, "artist_id": artist_id, "listed_price": prices}), npartitions=1)
c.create_table("paintings", paintings)

sql1 = """
SELECT name, listed_price
FROM paintings
WHERE listed_price > (
    SELECT AVG(listed_price)
    FROM paintings
)
"""
c.sql(sql1).compute()

Not sure if this is the way we should go about this (not generalizable enough?), but here is an initial quick fix for that example. The general idea is that since we are comparing listed_price to a 1x1 table containing AVG(listed_price), the latter has to be converted to a single value by calling compute() and with casting.

@sarahyurick
Copy link
Collaborator Author

sarahyurick commented Aug 11, 2022

Fixes example in #320

df = pd.DataFrame({'id': [0, 1, 2], 'name': ['a', 'b', 'c'], 'val': [0, 1, 2]})

c.create_table('test', df)
c.sql("""
select name, val, id from test a
where val >
  (select avg(val) from test)
""").compute()

@codecov-commenter
Copy link

codecov-commenter commented Aug 11, 2022

Codecov Report

❗ No coverage uploaded for pull request base (datafusion-sql-planner@c8259b9). Click here to learn what that means.
The diff coverage is n/a.

@@                    Coverage Diff                    @@
##             datafusion-sql-planner     #683   +/-   ##
=========================================================
  Coverage                          ?   66.95%           
=========================================================
  Files                             ?       73           
  Lines                             ?     3640           
  Branches                          ?      753           
=========================================================
  Hits                              ?     2437           
  Misses                            ?     1057           
  Partials                          ?      146           

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Collaborator

@charlesbluca charlesbluca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening @sarahyurick! While running through the issue repro you shared, I notice we also get some DataFusion warnings / errors:

Skipping optimizer rule decorrelate_scalar_subquery due to unexpected error: scalar subqueries must have a filter to be correlated at /home/nfs/charlesb/.cargo/git/checko
uts/arrow-datafusion-71ae82d9dec9a01c/6c32098/datafusion/optimizer/src/decorrelate_scalar_subquery.rs:177                                                                 
caused by                                                                                                                                                                 
Error during planning: Could not coerce into Filter! at /home/nfs/charlesb/.cargo/git/checkouts/arrow-datafusion-71ae82d9dec9a01c/6c32098/datafusion/expr/src/logical_plan
/plan.rs:1127

cc @andygrove in case you have some thoughts on this

except ValueError:
return reduce(
partial(self.operation, **kwargs),
(operands[0], float(operands[1][operands[1].columns[0]].loc[0].compute()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd imagine we'd want to generalize the typecast here to handle other potential aggregating functions, but ATM can't think of an immediate way to do this.

@andygrove
Copy link
Contributor

cc @andygrove in case you have some thoughts on this

I filed an issue against DataFusion to add support for this type of query: apache/datafusion#3266

@sarahyurick
Copy link
Collaborator Author

Looks like this was resolved on the DataFusion side with apache/datafusion#3287 !

@sarahyurick sarahyurick closed this Sep 8, 2022
@sarahyurick sarahyurick deleted the correlated_subqueries branch September 21, 2022 23:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants