Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix subquery alias #1067

Merged
merged 1 commit into from
Oct 8, 2021
Merged

fix subquery alias #1067

merged 1 commit into from
Oct 8, 2021

Conversation

xudong963
Copy link
Member

Which issue does this PR close?

Closes #1049

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added datafusion Changes in the datafusion crate sql SQL Planner labels Oct 3, 2021
@xudong963
Copy link
Member Author

> CREATE EXTERNAL TABLE customer STORED AS CSV LOCATION '/Users/bytedance/arrow-datafusion/datafusion/tests/customer.csv';
0 rows in set. Query took 0.014 seconds.
> explain select * from customer as a join (select * from customer as b) on a.column_1=b.column_1;
Plan("subquery in FROM must have an alias")
> explain select * from customer as a join (select * from customer) as b on a.column_1=b.column_1;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                     |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: #a.column_1, #a.column_2, #b.column_1, #b.column_2                                                                                                                           |
|               |   Join: #a.column_1 = #b.column_1                                                                                                                                                        |
|               |     TableScan: a projection=Some([0, 1])                                                                                                                                                 |
|               |     Projection: #customer.column_1, #customer.column_2                                                                                                                                   |
|               |       TableScan: customer projection=Some([0, 1])                                                                                                                                        |
| physical_plan | ProjectionExec: expr=[column_1@0 as column_1, column_2@1 as column_2, column_1@2 as column_1, column_2@3 as column_2]                                                                    |
|               |   CoalesceBatchesExec: target_batch_size=4096                                                                                                                                            |
|               |     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "column_1", index: 0 }, Column { name: "column_1", index: 0 })]                                                 |
|               |       CoalesceBatchesExec: target_batch_size=4096                                                                                                                                        |
|               |         RepartitionExec: partitioning=Hash([Column { name: "column_1", index: 0 }], 12)                                                                                                  |
|               |           RepartitionExec: partitioning=RoundRobinBatch(12)                                                                                                                              |
|               |             CsvExec: source=Path(/Users/bytedance/arrow-datafusion/datafusion/tests/customer.csv: [/Users/bytedance/arrow-datafusion/datafusion/tests/customer.csv]), has_header=false   |
|               |       CoalesceBatchesExec: target_batch_size=4096                                                                                                                                        |
|               |         RepartitionExec: partitioning=Hash([Column { name: "column_1", index: 0 }], 12)                                                                                                  |
|               |           ProjectionExec: expr=[column_1@0 as column_1, column_2@1 as column_2]                                                                                                          |
|               |             RepartitionExec: partitioning=RoundRobinBatch(12)                                                                                                                            |
|               |               CsvExec: source=Path(/Users/bytedance/arrow-datafusion/datafusion/tests/customer.csv: [/Users/bytedance/arrow-datafusion/datafusion/tests/customer.csv]), has_header=false |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.010 seconds.

@xudong963
Copy link
Member Author

xudong963 commented Oct 3, 2021

Though the PR fix the bug mentioned by the issue, there are still some bugs. I am confused and need help.
When I ran the cargo test, I found there were four tests that couldn't pass. For example tests::ballista_round_trip::q7
The following is the SQL executed in tests::ballista_round_trip::q7

select
    supp_nation,
    cust_nation,
    l_year,
    sum(volume) as revenue
from
    (
        select
            n1.n_name as supp_nation,
            n2.n_name as cust_nation,
            extract(year from l_shipdate) as l_year,
            l_extendedprice * (1 - l_discount) as volume
        from
            supplier,
            lineitem,
            orders,
            customer,
            nation n1,
            nation n2
        where
                s_suppkey = l_suppkey
          and o_orderkey = l_orderkey
          and c_custkey = o_custkey
          and s_nationkey = n1.n_nationkey
          and c_nationkey = n2.n_nationkey
          and (
                (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY')
                or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE')
            )
          and l_shipdate between date '1995-01-01' and date '1996-12-31'
    ) as shipping
group by
    supp_nation,
    cust_nation,
    l_year
order by
    supp_nation,
    cust_nation,
    l_year;

Then I printed some logs to find the potential problems.
First of all, I printed the plan at https://github.com/apache/arrow-datafusion/blob/master/benchmarks/src/bin/tpch.rs#L1092

Sort: #shipping.supp_nation ASC NULLS FIRST, #shipping.cust_nation ASC NULLS FIRST, #shipping.l_year ASC NULLS FIRST
  Projection: #shipping.supp_nation, #shipping.cust_nation, #shipping.l_year, #shipping.volume
    Projection: #n1.n_name AS supp_nation, #n2.n_name AS cust_nation, datepart(Utf8("YEAR"), #lineitem.l_shipdate) AS l_year, #lineitem.l_extendedprice * Int64(1) - #lineitem.l_discount AS volume
      Filter: #n1.n_name = Utf8("FRANCE") AND #n2.n_name = Utf8("GERMANY") OR #n1.n_name = Utf8("GERMANY") AND #n2.n_name = Utf8("FRANCE") AND #lineitem.l_shipdate BETWEEN CAST(Utf8("1995-01-01") AS Date32) AND CAST(Utf8("1996-12-31") AS Date32)
        Join: #customer.c_nationkey = #n2.n_nationkey
          Join: #supplier.s_nationkey = #n1.n_nationkey
            Join: #orders.o_custkey = #customer.c_custkey
              Join: #lineitem.l_orderkey = #orders.o_orderkey
                Join: #supplier.s_suppkey = #lineitem.l_suppkey
                  TableScan: supplier projection=None
                  TableScan: lineitem projection=None
                TableScan: orders projection=None
              TableScan: customer projection=None
            TableScan: n1 projection=None
          TableScan: n2 projection=None

It's OK?

Then I tried to find problems in https://github.com/apache/arrow-datafusion/blob/master/benchmarks/src/bin/tpch.rs#L1094 because the test panicked at here(Logical Plan Proto to Logical Plan)
Finally, I found the panic at building projection logical plan https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/logical_plan/builder.rs#L240

The projected_expr in https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/logical_plan/builder.rs#L255 is the following

[#shipping.supp_nation, #shipping.cust_nation, #shipping.l_year, #shipping.volume]

But the plan and the plan schema are the followings

 Projection: #n1.n_name AS supp_nation, #n2.n_name AS cust_nation, datepart(Utf8("YEAR"), #lineitem.l_shipdate) AS l_year, #lineitem.l_extendedprice * Int64(1) - #lineitem.l_discount AS volume
      Filter: #n1.n_name = Utf8("FRANCE") AND #n2.n_name = Utf8("GERMANY") OR #n1.n_name = Utf8("GERMANY") AND #n2.n_name = Utf8("FRANCE") AND #lineitem.l_shipdate BETWEEN CAST(Utf8("1995-01-01") AS Date32) AND CAST(Utf8("1996-12-31") AS Date32)
        Join: #customer.c_nationkey = #n2.n_nationkey
          Join: #supplier.s_nationkey = #n1.n_nationkey
            Join: #orders.o_custkey = #customer.c_custkey
              Join: #lineitem.l_orderkey = #orders.o_orderkey
                Join: #supplier.s_suppkey = #lineitem.l_suppkey
                  TableScan: supplier projection=None
                  TableScan: lineitem projection=None
                TableScan: orders projection=None
              TableScan: customer projection=None
            TableScan: n1 projection=None
          TableScan: n2 projection=None
DFSchema { fields: [DFField { qualifier: Some("n1"), field: Field { name: "supp_nation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None } }, DFField { qualifier: Some("n2"), field: Field { name: "cust_nation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None } }, DFField { qualifier: None, field: Field { name: "l_year", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None } }, DFField { qualifier: None, field: Field { name: "volume", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None } }] }

So there will be conflict in https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/logical_plan/dfschema.rs#L150

I have two questions

  1. https://github.com/apache/arrow-datafusion/blob/master/benchmarks/src/bin/tpch.rs#L1092 is ok and get a plan, the plan is correct?
  2. Which part makes Logical Plan Proto to Logical Plan get errors, now I don't know how to fix it.

PTAL and give me some help @alamb @houqp @Dandandan. Thanks very much!

@houqp
Copy link
Member

houqp commented Oct 3, 2021

https://github.com/apache/arrow-datafusion/blob/master/benchmarks/src/bin/tpch.rs#L1092 is ok and get a plan, the plan is correct?

Yes, i think the logical plan you got there is correct.

Which part makes Logical Plan Proto to Logical Plan get errors, now I don't know how to fix it.

I believe this is due to the alias patching is done at the sql planner layer, not the plan builder layer. We only have this alias info in the sql planner right now (parsed from the sql query). When you patch the projection in the SQL planner with the alias, this alias info is lost in the logical plan. In other words, it's not possible to reconstruct the same alias patched projection plan node using only the information provided by the logical plan tree.

If you look at the children of the patched projection, there is no mentioning of that subquery alias b at all.

|               |     TableScan: a projection=Some([0, 1])                                                                                                                                                 |
|               |     Projection: #customer.column_1, #customer.column_2                                                                                                                                   |
|               |       TableScan: customer projection=Some([0, 1])     

In order to make the full plan serializable without access to the raw SQL query, we need to add the subquery alias to the logical plan tree as well. During protobuf plan ser/de in ballista, we don't pass along the SQL query, but only the planned logical plan from the SQL planner.

I can see two ways to accomplish this:

First approach is to add an optional alias field to our projection plan node similar to what we have with union:

https://github.com/apache/arrow-datafusion/blob/2f04d67156ec91afa628a3ed47003b8f992450bf/datafusion/src/logical_plan/plan.rs#L160

Then we can perform the schema qualifier patch in the plan builder's project method similar to what we do with union alias:

https://github.com/apache/arrow-datafusion/blob/2f04d67156ec91afa628a3ed47003b8f992450bf/datafusion/src/logical_plan/builder.rs#L584

Second approach is to introduce a new type of Alias plan node that we can use to wrap any plan node to perform this qualifier patching logic.

I think adding an alias field to the projection plan node would be simpler. @alamb @Dandandan @jorgecarleitao @andygrove WDTY?

@houqp houqp requested review from alamb, Dandandan and andygrove October 3, 2021 20:19
@houqp houqp added the bug Something isn't working label Oct 3, 2021
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xudong963 -- this is a great contribution

I think this PR is on the right track. After reading the code and comments, I agree with @houqp's analysis, and I think his suggestion of adding an alias field to LogicalPlan::Projection will be the cleanest approach

First approach is to add an optional alias field to our projection plan node similar to what we have with union:

You would also have to change the logic that computes the output DFSchema to account for this alias, but then I bet everything else "would just work"

self.query_to_plan_with_alias(
} => {
// if alias is None, return Err
if alias.is_none() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this is consistent with Postgres:

alamb=# select * from public.simple as a join (select * from public.simple) on a.c3;
ERROR:  subquery in FROM must have an alias
LINE 1: select * from public.simple as a join (select * from public....
                                              ^
HINT:  For example, FROM (SELECT ...) [AS] foo.

👍

// if alias is None, return Err
if alias.is_none() {
return Err(DataFusionError::Plan(
"subquery in FROM must have an alias".parse().unwrap(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"subquery in FROM must have an alias".parse().unwrap(),
"subquery in FROM must have an alias".to_string(),

I think you can just create a String here

@@ -298,6 +298,19 @@ impl LogicalPlan {
| LogicalPlan::Filter { input, .. } => input.all_schemas(),
}
}
/// schema to projection logical plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code seems like it tries to specify that the output schema's relation name (table alias) that is different than than the input schema's relation.

I think what is desired is a new LogicalPlan::Projection that changed the schema names rather than trying to rewrite them.

For example, to change the table alias from a to b

If the input was like

  LogicalPlan::Projection(schema = {a.c1, a.c2}, expr: [a.c1, a.c2])

As @houqp says we should add a single new LogicalPlan node like:

  LogicalPlan::Projection(schema = {b.c1, b.c2}, expr: [a.c1, a.c2])
    LogicalPlan::Projection(schema = {a.c1, a.c2}, expr: [a.c1, a.c2])

(in other words, don't try and rewrite the existing LogicalPlan::Projection, but put a new one on the top that changes the schema)

If you use @houqp 's suggestion to add an optional alias to LogicalPlan::Projection then the top LogicalPlan can be created

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good idea! o( ̄▽ ̄)d

datafusion/src/sql/planner.rs Outdated Show resolved Hide resolved
@xudong963
Copy link
Member Author

Keep up to date with the latest developments: @houqp @alamb

  1. Use the suggestion from @houqp, four tests about ballista successfully passed!😁
  2. But due to use strictly restrict for subquery subquery in FROM must have an alias, unluckily some tests don't pass now.

One question I want to ask is if we use the strictly restrict for subquery to bring into correspondence with pg? If so, I will try to fix unlucky tests.

@houqp
Copy link
Member

houqp commented Oct 6, 2021

yes, i think it's fine to fix the test instead, datafusion claims to be postgres compatible, so we want to be as close to postgres as possible.

@xudong963
Copy link
Member Author

PTAL😄, thanks! @houqp @alamb

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks great @xudong963 -- thank you very much!

@@ -2090,7 +2090,7 @@ mod tests {
let results = plan_and_collect(
&mut ctx,
"SELECT * FROM t as t1 \
JOIN (SELECT * FROM t as t2) \
JOIN (SELECT * FROM t) as t2 \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

validate_unique_names("Projections", projected_expr.iter(), input_schema)?;

let schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
Ok(Self::from(project_with_alias(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you could write this like self.project_with_alias(expr, None) which might be slightly cleaner

Some(ref alias) => input_schema.replace_qualifier(alias.as_str()),
None => input_schema,
};
Ok(LogicalPlan::Projection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb
Copy link
Contributor

alamb commented Oct 7, 2021

I plan to merge this tomorrow morning, Eastern Time, if no one else has done so (I want to let @houqp have a chance to review if he would like to)

@xudong963 xudong963 force-pushed the fix_join_alias branch 2 times, most recently from b0770be to 9113f8f Compare October 8, 2021 01:50
Comment on lines 549 to 556
let mut df_fields_with_alias = Vec::new();
for df_field in schema.fields().iter() {
let df_field_with_alias = DFField::from_qualified(
&alias.as_ref().unwrap().name.value,
df_field.field().clone(),
);
df_fields_with_alias.push(df_field_with_alias);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like df_fields_with_alias is not being used anymore? the schema patching should have been covered by project_with_alias right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good sight!

Comment on lines 595 to 592
.map(|(field, ident)| {
col_with_table_name(
field.name(),
&*(alias.clone().name.value),
)
.alias(ident.value.as_str())
}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After taking a closer look at the code, I now agree with @alamb that we don't need to do a 2nd round of patching here. The fields comes from plan.schema().fields(). The schema for the plan we referenced here has already been patched in the previous relation match block.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking again, I agree. Because we wrapped a projection plan with alias in the previous TableFactor::Derived.

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xudong963 could you also update LogicalPlan's pub fn display method to print the projection alias? This should help make the logical plan more readable :)

@xudong963
Copy link
Member Author

@xudong963 could you also update LogicalPlan's pub fn display method to print the projection alias? This should help make the logical plan more readable :)

No problem

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, great work @xudong963 ! I will let @alamb do the final merge when he wakes up :)

@xudong963
Copy link
Member Author

xudong963 commented Oct 8, 2021

CI seems unstable.

=================================== FAILURES ===================================
_____________________________ test_math_functions ______________________________

df = <builtins.DataFrame object at 0x7f37c5055630>

    def test_math_functions(df):
        values = np.array([0.1, -0.7, 0.55])
        col_v = f.col("value")
        df = df.select(
            f.abs(col_v),
            f.sin(col_v),
            f.cos(col_v),
            f.tan(col_v),
            f.asin(col_v),
            f.acos(col_v),
            f.exp(col_v),
            f.ln(col_v + f.lit(1)),
            f.log2(col_v + f.lit(1)),
            f.log10(col_v + f.lit(1)),
            f.random(),
        )
>       result = df.collect()
E       Exception: DataFusion error: Plan("No field named '<unqualified>.0QsBN24Phk.value'. Valid fields are '0QsBN24Phk.value'.")

@houqp
Copy link
Member

houqp commented Oct 8, 2021

Yes, we can ignore that CI test for now.

@francis-du
Copy link
Contributor

👍

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Thank you again @xudong963 !

),
)?;
(
project_with_alias(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much nicer

@alamb alamb merged commit d331fa2 into apache:master Oct 8, 2021
@xudong963 xudong963 deleted the fix_join_alias branch October 12, 2021 14:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working datafusion Changes in the datafusion crate sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[SQL Syntax] - Unrecognized subquery alias
4 participants