Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error Unexpected accumulator state List([NULL]) in COUNT(distinct ..) query #1623

Closed
alamb opened this issue Jan 20, 2022 · 11 comments
Closed
Labels
bug Something isn't working

Comments

@alamb
Copy link
Contributor

alamb commented Jan 20, 2022

Describe the bug
A query returns this error:

ArrowError(ExternalError(Internal("Unexpected accumulator state List([NULL])")))

To Reproduce
Download and extract data.zip

cargo run --bin datafusion-cli
    Finished dev [unoptimized + debuginfo] target(s) in 0.93s
     Running `target/debug/datafusion-cli`
DataFusion CLI v5.1.0

❯ create external table stops stored as parquet location '2021-11.parquet';
0 rows in set. Query took 0.010 seconds.
❯ select count(*), count(distinct stop_name), trip_tid  from stops group by trip_tid limit 10;
ArrowError(ExternalError(Internal("Unexpected accumulator state List([NULL])")))

Expected behavior
A result

Additional context
@matthewmturner and @jhorstmann helped me discover this as part of the Rust syncup today 🤣

@alamb alamb added the bug Something isn't working label Jan 20, 2022
@HaoYang670
Copy link
Contributor

HaoYang670 commented Feb 8, 2022

It seems like there is List(None, _) in the states:

        let col_values = states
            .iter()
            .map(|state| match state {
                ScalarValue::List(Some(values), _) => Ok(values),
                _ => Err(DataFusionError::Internal(format!(
                    "Unexpected accumulator state {:?}",
                    state
                ))),
            })
            .collect::<Result<Vec<_>>>()?;

https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/physical_plan/expressions/distinct_expressions.rs#L148-L157

@HaoYang670
Copy link
Contributor

HaoYang670 commented Feb 8, 2022

These queries work:

select count(*), count(distinct stop_name), trip_tid from stops where stop_name is not null group by trip_tid limit 10;
+-----------------+---------------------------------+----------+
| COUNT(UInt8(1)) | COUNT(DISTINCT stops.stop_name) | trip_tid |
+-----------------+---------------------------------+----------+
| 1               | 1                               | 54787914 |
| 1               | 1                               | 54804331 |
| 1               | 1                               | 54756522 |
| 1               | 1                               | 54791196 |
| 1               | 1                               | 54775777 |
| 1               | 1                               | 54788343 |
| 1               | 1                               | 54788169 |
| 2               | 2                               | 54793827 |
| 1               | 1                               | 54776433 |
| 1               | 1                               | 54788382 |
+-----------------+---------------------------------+----------+select count(distinct stop_name), trip_tid from stops group by trip_tid limit 10;
+---------------------------------+----------+
| COUNT(DISTINCT stops.stop_name) | trip_tid |
+---------------------------------+----------+
| 2                               | 54793827 |
| 0                               | 54807414 |
| 0                               | 54807426 |
| 0                               | 54807516 |
| 0                               | 54827775 |
| 0                               | 54827714 |
| 0                               | 54807481 |
| 0                               | 54827749 |
| 0                               | 54807473 |
| 0                               | 54919940 |
+---------------------------------+----------+

@alamb
Copy link
Contributor Author

alamb commented Feb 8, 2022

Perhaps the accumulator just needs to be able to handle "null" (aka seeing no values)

@HaoYang670
Copy link
Contributor

I am curious about why

select count(distinct stop_name), trip_tid from stops group by trip_tid limit 10;

works,
but

select count(*), count(distinct stop_name), trip_tid  from stops group by trip_tid limit 10;

does not work.

@alamb
Copy link
Contributor Author

alamb commented Feb 9, 2022

What do the plans look like if you do EXPLAIN select count(distinct stop_name), trip_tid from stops group by trip_tid limit 10;? Perhaps the optimizer is doing something non trivial to the plan

@alamb
Copy link
Contributor Author

alamb commented Feb 9, 2022

EXPLAIN VERBOSE select count(distinct stop_name), trip_tid from stops group by trip_tid limit 10; will tell you even more

@HaoYang670
Copy link
Contributor

EXPLAIN VERBOSE select count(*) count(distinct stop_name) from stops group by trip_tid
| physical_plan                                         | ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1)), COUNT(DISTINCT stops.stop_name)@2 as COUNT(DISTINCT stops.stop_name)] |
|                                                       |   HashAggregateExec: mode=FinalPartitioned, gby=[trip_tid@0 as trip_tid], aggr=[COUNT(UInt8(1)), COUNT(DISTINCT stops.stop_name)] |
|                                                       |     CoalesceBatchesExec: target_batch_size=4096                                                                                   |
|                                                       |       RepartitionExec: partitioning=Hash([Column { name: "trip_tid", index: 0 }], 16)                                             |
|                                                       |         HashAggregateExec: mode=Partial, gby=[trip_tid@0 as trip_tid], aggr=[COUNT(UInt8(1)), COUNT(DISTINCT stops.stop_name)]    |
|                                                       |           RepartitionExec: partitioning=RoundRobinBatch(16)                                                                       |
|                                                       |             ParquetExec: limit=None, partitions=[/home/remziy/Downloads/data/2021-11.parquet]

EXPLAIN VERBOSE select count(distinct stop_name) from stops group by trip_tid
| physical_plan                                         | ProjectionExec: expr=[COUNT(DISTINCT stops.stop_name)@0 as COUNT(DISTINCT stops.stop_name)]                                          |
|                                                       |   ProjectionExec: expr=[COUNT(alias1)@1 as COUNT(DISTINCT stops.stop_name)]                                                          |
|                                                       |     HashAggregateExec: mode=FinalPartitioned, gby=[trip_tid@0 as trip_tid], aggr=[COUNT(alias1)]                                     |
|                                                       |       CoalesceBatchesExec: target_batch_size=4096                                                                                    |
|                                                       |         RepartitionExec: partitioning=Hash([Column { name: "trip_tid", index: 0 }], 16)                                              |
|                                                       |           HashAggregateExec: mode=Partial, gby=[trip_tid@0 as trip_tid], aggr=[COUNT(alias1)]                                        |
|                                                       |             HashAggregateExec: mode=FinalPartitioned, gby=[trip_tid@0 as trip_tid, alias1@1 as alias1], aggr=[]                      |
|                                                       |               CoalesceBatchesExec: target_batch_size=4096                                                                            |
|                                                       |                 RepartitionExec: partitioning=Hash([Column { name: "trip_tid", index: 0 }, Column { name: "alias1", index: 1 }], 16) |
|                                                       |                   HashAggregateExec: mode=Partial, gby=[trip_tid@0 as trip_tid, stop_name@1 as alias1], aggr=[]                      |
|                                                       |                     RepartitionExec: partitioning=RoundRobinBatch(16)                                                                |
|                                                       |                       ParquetExec: limit=None, partitions=[/home/remziy/Downloads/data/2021-11.parquet] 

This is a little amazing for me. Without count(*), datafusion will replace count distinct by firstly doing hash aggregate on stop_name (generate alias1) and then count directly.

After debugging, I find that with count(*), we will use DistinctCountAccumulator and without count(*), we use CountAccumulator instead.

@alamb
Copy link
Contributor Author

alamb commented Feb 11, 2022

This is a little amazing for me. Without count(*), datafusion will replace count distinct by firstly doing hash aggregate on stop_name (generate alias1) and then count directly.

That is a cool optimization (it is in https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/optimizer/single_distinct_to_groupby.rs) contributed by @ic4y

@HaoYang670
Copy link
Contributor

I also test the sql: (removing group by trip_tid)

select count(*), count(distinct stop_name) 
from stops 
limit 3

and there is no error appearing.

After debugging, I find that the function DistinctCountAccumulator.merge_batch is called from group_aggregate_batch when the sql containing group by trip_tid and is called from aggregate_batch when removing group by trip_tid.
So I guess the bug is in the function group_aggregate_batch in hash_aggregate.rs, but I am not sure.

@findepi
Copy link
Member

findepi commented Jul 3, 2024

this issue doesn't seem reproducible on current main. @alamb close?

@alamb
Copy link
Contributor Author

alamb commented Jul 5, 2024

I agree -- thanks @findepi

I double checked and indeed it is no longer reproduceable

andrewlamb@Andrews-MacBook-Pro-2:~/Downloads$ datafusion-cli
DataFusion CLI v39.0.0
> create external table stops stored as parquet location '2021-11.parquet';
0 row(s) fetched.
Elapsed 0.027 seconds.

> select count(*), count(distinct stop_name), trip_tid  from stops group by trip_tid limit 10;
+----------+---------------------------------+----------+
| count(*) | count(DISTINCT stops.stop_name) | trip_tid |
+----------+---------------------------------+----------+
| 7        | 0                               | 54776439 |
| 7        | 0                               | 54776447 |
| 8        | 0                               | 54776412 |
| 6        | 0                               | 54776432 |
| 25       | 0                               | 54775709 |
| 17       | 0                               | 54775749 |
| 25       | 0                               | 54775832 |
| 12       | 0                               | 54775839 |
| 17       | 0                               | 54775833 |
| 14       | 0                               | 54775738 |
+----------+---------------------------------+----------+
10 row(s) fetched.
Elapsed 0.052 seconds.

@alamb alamb closed this as completed Jul 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants