Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ballista assumes all aggregate expressions are not DISTINCT #5

Closed
andygrove opened this issue May 10, 2022 · 6 comments · Fixed by #384
Closed

Ballista assumes all aggregate expressions are not DISTINCT #5

andygrove opened this issue May 10, 2022 · 6 comments · Fixed by #384
Labels
bug Something isn't working good first issue Good for newcomers

Comments

@andygrove
Copy link
Member

Describe the bug
We have a hard-coded distinct = false parameter in ballista/rust/core/src/serde/physical_plan/mod.rs.

Ok(create_aggregate_expr(
    &aggr_function.into(),
    false, // <-- hard-coded "distinct"
    input_phy_expr.as_slice(),
    &physical_schema,
    name.to_string(),
)?)

To Reproduce
Try running a COUNT(DISTINCT expr) in Ballista

Expected behavior
We need to include the distinct flag in the protobuf for aggregate queries and implement the appropriate serde code.

Additional context
None

@andygrove andygrove added bug Something isn't working good first issue Good for newcomers ballista labels May 10, 2022
@comphead
Copy link
Contributor

Hi @andygrove Ive run in local ballista

SELECT count(distinct c1) as cnt_distinct FROM aggregate_test_100

And the result is expected

+--------------+
| cnt_distinct |
+--------------+
| 5            |
+--------------+

which is expected

@comphead
Copy link
Contributor

comphead commented May 12, 2022

I checked the backtrace

   2: datafusion_physical_expr::aggregate::build_in::create_aggregate_expr
             at ./datafusion/physical-expr/src/aggregate/build_in.rs:75:13
   3: datafusion::physical_plan::planner::create_aggregate_expr_with_name
             at ./datafusion/core/src/physical_plan/planner.rs:1347:13
   4: datafusion::physical_plan::planner::create_aggregate_expr
             at ./datafusion/core/src/physical_plan/planner.rs:1390:5
   5: datafusion::physical_plan::planner::DefaultPhysicalPlanner::create_initial_plan::{{closure}}::{{closure}}
             at ./datafusion/core/src/physical_plan/planner.rs:525:29

sounds weird, but I didn't notice ballista modules here.

@andygrove andygrove transferred this issue from apache/datafusion May 19, 2022
Ted-Jiang referenced this issue in Ted-Jiang/arrow-ballista Jul 27, 2022
@andygrove
Copy link
Member Author

Related: apache/datafusion#3250

@andygrove
Copy link
Member Author

Hi @andygrove Ive run in local ballista

The issue is specific to distributed mode because it is the serde that has the hard-coded value

@comphead
Copy link
Contributor

Hi @andygrove Ive run in local ballista

The issue is specific to distributed mode because it is the serde that has the hard-coded value

Is there any doc how to run ballista tests in distributed mode? perhaps its part of CI now?

iajoiner referenced this issue in iajoiner/arrow-ballista Sep 20, 2022
* Squash

* Ignore existing
@r4ntix
Copy link
Contributor

r4ntix commented Oct 18, 2022

@andygrove @comphead
I tried to analyze the problem and found that SELECT count(distinct c1) as cnt_distinct FROM aggregate_test_100 is also expected in distributed mode.

Because single distinct is optimized by the optimizer as group by in datafusion:

example sql: select count(distinct c_name) from customer_1;

// Logic plan before optimization: 
Projection: COUNT(DISTINCT customer_1.c_name)                                                                                                                           
   Aggregate: groupBy=[[]], aggr=[[COUNT(DISTINCT customer_1.c_name)]]                                                                                                   
     TableScan: customer_1 projection=[c_name]

// Logic plan after optimization:
Projection: COUNT(DISTINCT customer_1.c_name)                                                                                                                                     
   Projection: COUNT(alias1) AS COUNT(DISTINCT customer_1.c_name)                                                                                                                  
     Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]]                                                                                                                               
       Aggregate: groupBy=[[customer_1.c_name AS alias1]], aggr=[[]]                                                                                                               
         TableScan: customer_1 projection=[c_name]

The current problem with ballista is that it does not support DistinctCount in non-single distinct scenarios.
Example sql select count(distinct c_name), max(c_name) from customer_1:

[2022-10-18T06:32:14Z ERROR ballista_core::execution_plans::distributed_query] Job 3N8dtpp failed: Error planning job 3N8dtpp: NotImplemented("Aggregate function not supported: DistinctCount { name: \"COUNT(DISTINCT customer_1.c_name)\", data_type: Int64, state_data_types: [Utf8], exprs: [Column { name: \"c_name\", index: 0 }] }")
DataFusionError(ArrowError(ExternalError(Execution("Job 3N8dtpp failed: Error planning job 3N8dtpp: NotImplemented(\"Aggregate function not supported: DistinctCount { name: \\\"COUNT(DISTINCT customer_1.c_name)\\\", data_type: Int64, state_data_types: [Utf8], exprs: [Column { name: \\\"c_name\\\", index: 0 }] }\")"))))

I will sumit a PR for this :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants