Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InvalidArgumentError("Column 'COUNT(DISTINCT demo.name)[count distinct]' is declared as non-nullable but contains null values")' #4040

Closed
MichaelLeeHZ opened this issue Oct 31, 2022 · 8 comments
Labels
bug Something isn't working

Comments

@MichaelLeeHZ
Copy link

MichaelLeeHZ commented Oct 31, 2022

Describe the bug
Datafusion panic when I query select app,count(distinct name) from demo group by app.
Here is the stacktrace:

InvalidArgumentError("Column 'COUNT(DISTINCT demo.name)[count distinct]' is declared as non-nullable but contains null values")' at "/Users/michael/.cargo/git/checkouts/arrow-datafusion-b9eb4f789f8bda1f/d84ea9c/datafusion/core/src/physical_plan/repartition.rs:178"
   0: backtrace::backtrace::libunwind::trace
             at /Users/michael/.cargo/registry/src/github.aaakk.us.kg-1ecc6299db9ec823/backtrace-0.3.66/src/backtrace/mod.rs:66:5
      backtrace::backtrace::trace_unsynchronized
             at /Users/michael/.cargo/registry/src/github.aaakk.us.kg-1ecc6299db9ec823/backtrace-0.3.66/src/backtrace/mod.rs:66:5
      backtrace::backtrace::trace
             at /Users/michael/.cargo/registry/src/github.aaakk.us.kg-1ecc6299db9ec823/backtrace-0.3.66/src/backtrace/mod.rs:53:14
      backtrace::capture::Backtrace::create
             at /Users/michael/.cargo/registry/src/github.aaakk.us.kg-1ecc6299db9ec823/backtrace-0.3.66/src/capture.rs:176:9
      backtrace::capture::Backtrace::new
             at /Users/michael/.cargo/registry/src/github.aaakk.us.kg-1ecc6299db9ec823/backtrace-0.3.66/src/capture.rs:140:22
   1: common_util::panic::set_panic_hook::{{closure}}
             at common_util/src/panic.rs:41:18
   2: std::panicking::rust_panic_with_hook
             at /rustc/d394408fb38c4de61f765a3ed5189d2731a1da91/library/std/src/panicking.rs:702:17
   3: std::panicking::begin_panic_handler::{{closure}}
             at /rustc/d394408fb38c4de61f765a3ed5189d2731a1da91/library/std/src/panicking.rs:588:13
   4: std::sys_common::backtrace::__rust_end_short_backtrace
             at /rustc/d394408fb38c4de61f765a3ed5189d2731a1da91/library/std/src/sys_common/backtrace.rs:138:18
   5: rust_begin_unwind
             at /rustc/d394408fb38c4de61f765a3ed5189d2731a1da91/library/std/src/panicking.rs:584:5
   6: core::panicking::panic_fmt
             at /rustc/d394408fb38c4de61f765a3ed5189d2731a1da91/library/core/src/panicking.rs:142:14
   7: core::result::unwrap_failed
             at /rustc/d394408fb38c4de61f765a3ed5189d2731a1da91/library/core/src/result.rs:1814:5
   8: core::result::Result<T,E>::unwrap
             at /rustc/d394408fb38c4de61f765a3ed5189d2731a1da91/library/core/src/result.rs:1107:23
      datafusion::physical_plan::repartition::BatchPartitioner::partition
             at /Users/michael/.cargo/git/checkouts/arrow-datafusion-b9eb4f789f8bda1f/d84ea9c/datafusion/core/src/physical_plan/repartition.rs:178:33
   9: datafusion::physical_plan::repartition::RepartitionExec::pull_from_input::{{closure}}
             at /Users/michael/.cargo/git/checkouts/arrow-datafusion-b9eb4f789f8bda1f/d84ea9c/datafusion/core/src/physical_plan/repartition.rs:452:13
      <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/d394408fb38c4de61f765a3ed5189d2731a1da91/library/core/src/future/mod.rs:91:19

To Reproduce

  1. Create a table like this:
CREATE TABLE `demo` (`app` string NULL,`name` string NULL, `value` double NOT NULL)
  1. Insert data
INSERT INTO demo(app, value) VALUES('app', 100)
  1. Query like following statement, with Group-By and Count(DISTINCT) operator.
select `t`, count(distinct name) from demo group by `t`

Expected behavior
Return a result, not panic

Additional context
I found this bug when I use ceresdb, apache/horaedb#302;
And I found if partition_num is set to more than 1, the error is as above; If partition_num is set to 1, error is as:#1623.

With digging into code, I found Logical Plan is :

 Projection: #demo.t, #COUNT(DISTINCT demo.name)
  Aggregate: groupBy=[[#demo.t]], aggr=[[COUNT(DISTINCT #demo.name)]]
    TableScan: demo projection=[t, name]

and physical plan is as following, I guess the second AggregateExec's schema field "COUNT(DISTINCT demo.name)[count distinct]" which is nullable cause the error.

ProjectionExec {
    expr: [
        (
            Column {
                name: "t",
                index: 0,
            },
            "t",
        ),
        (
            Column {
                name: "COUNT(DISTINCT demo.name)",
                index: 1,
            },
            "COUNT(DISTINCT demo.name)",
        ),
    ],
    schema: Schema {
        fields: [
            Field {
                name: "t",
                data_type: Timestamp(
                    Millisecond,
                    None,
                ),
                nullable: false,
                dict_id: 0,
                dict_is_ordered: false,
                metadata: None,
            },
            Field {
                name: "COUNT(DISTINCT demo.name)",
                data_type: Int64,
                nullable: true,
                dict_id: 0,
                dict_is_ordered: false,
                metadata: None,
            },
        ],
        metadata: {},
    },
    input: AggregateExec {
        mode: FinalPartitioned,
        group_by: PhysicalGroupBy {
            expr: [
                (
                    Column {
                        name: "t",
                        index: 0,
                    },
                    "t",
                ),
            ],
            null_expr: [],
            groups: [
                [
                    false,
                ],
            ],
        },
        aggr_expr: [
            DistinctCount {
                name: "COUNT(DISTINCT demo.name)",
                data_type: Int64,
                state_data_types: [
                    Utf8,
                ],
                exprs: [
                    Column {
                        name: "name",
                        index: 1,
                    },
                ],
            },
        ],
        input: CoalesceBatchesExec {
            input: RepartitionExec {
                input: AggregateExec {
                    mode: Partial,
                    group_by: PhysicalGroupBy {
                        expr: [
                            (
                                Column {
                                    name: "t",
                                    index: 0,
                                },
                                "t",
                            ),
                        ],
                        null_expr: [],
                        groups: [
                            [
                                false,
                            ],
                        ],
                    },
                    aggr_expr: [
                        DistinctCount {
                            name: "COUNT(DISTINCT demo.name)",
                            data_type: Int64,
                            state_data_types: [
                                Utf8,
                            ],
                            exprs: [
                                Column {
                                    name: "name",
                                    index: 1,
                                },
                            ],
                        },
                    ],
                    input: ScanTable {
                        projected_schema: ProjectedSchema {
                            original_schema: Schema {
                                num_key_columns: 2,
                                timestamp_index: 0,
                                tsid_index: Some(
                                    1,
                                ),
                                enable_tsid_primary_key: true,
                                column_schemas: ColumnSchemas {
                                    columns: [
                                        ColumnSchema {
                                            id: 1,
                                            name: "t",
                                            data_type: Timestamp,
                                            is_nullable: false,
                                            is_tag: false,
                                            comment: "",
                                            escaped_name: "t",
                                            default_value: None,
                                        },
                                        ColumnSchema {
                                            id: 2,
                                            name: "tsid",
                                            data_type: UInt64,
                                            is_nullable: false,
                                            is_tag: false,
                                            comment: "",
                                            escaped_name: "tsid",
                                            default_value: None,
                                        },
                                        ColumnSchema {
                                            id: 3,
                                            name: "name",
                                            data_type: String,
                                            is_nullable: true,
                                            is_tag: true,
                                            comment: "",
                                            escaped_name: "name",
                                            default_value: None,
                                        },
                                        ColumnSchema {
                                            id: 4,
                                            name: "value",
                                            data_type: Double,
                                            is_nullable: false,
                                            is_tag: false,
                                            comment: "",
                                            escaped_name: "value",
                                            default_value: None,
                                        },
                                    ],
                                },
                                version: 1,
                            },
                            projection: Some(
                                [
                                    0,
                                    2,
                                ],
                            ),
                        },
                        table: "demo",
                        read_order: None,
                        read_parallelism: 8,
                        predicate: Predicate {
                            exprs: [],
                            time_range: TimeRange {
                                inclusive_start: Timestamp(
                                    -9223372036854775808,
                                ),
                                exclusive_end: Timestamp(
                                    9223372036854775807,
                                ),
                            },
                        },
                    },
                    schema: Schema {
                        fields: [
                            Field {
                                name: "t",
                                data_type: Timestamp(
                                    Millisecond,
                                    None,
                                ),
                                nullable: false,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: None,
                            },
                            Field {
                                name: "COUNT(DISTINCT demo.name)[count distinct]",
                                data_type: List(
                                    Field {
                                        name: "item",
                                        data_type: Utf8,
                                        nullable: true,
                                        dict_id: 0,
                                        dict_is_ordered: false,
                                        metadata: None,
                                    },
                                ),
                                nullable: false,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: None,
                            },
                        ],
                        metadata: {},
                    },
                    input_schema: Schema {
                        fields: [
                            Field {
                                name: "t",
                                data_type: Timestamp(
                                    Millisecond,
                                    None,
                                ),
                                nullable: false,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: Some(
                                    {
                                        "field::comment": "",
                                        "field::id": "1",
                                        "field::is_tag": "false",
                                    },
                                ),
                            },
                            Field {
                                name: "name",
                                data_type: Utf8,
                                nullable: true,
                                dict_id: 0,
                                dict_is_ordered: false,
                                metadata: Some(
                                    {
                                        "field::comment": "",
                                        "field::id": "3",
                                        "field::is_tag": "true",
                                    },
                                ),
                            },
                        ],
                        metadata: {
                            "schema:num_key_columns": "2",
                            "schema::enable_tsid_primary_key": "true",
                            "schema::timestamp_index": "0",
                            "schema::version": "1",
                        },
                    },
                    metrics: ExecutionPlanMetricsSet {
                        inner: Mutex {
                            data: MetricsSet {
                                metrics: [],
                            },
                        },
                    },
                },
                partitioning: Hash(
                    [
                        Column {
                            name: "t",
                            index: 0,
                        },
                    ],
                    8,
                ),
                state: Mutex {
                    data: RepartitionExecState {
                        channels: {},
                        abort_helper: AbortOnDropMany(
                            [],
                        ),
                    },
                },
                metrics: ExecutionPlanMetricsSet {
                    inner: Mutex {
                        data: MetricsSet {
                            metrics: [],
                        },
                    },
                },
            },
            target_batch_size: 4096,
            metrics: ExecutionPlanMetricsSet {
                inner: Mutex {
                    data: MetricsSet {
                        metrics: [],
                    },
                },
            },
        },
        schema: Schema {
            fields: [
                Field {
                    name: "t",
                    data_type: Timestamp(
                        Millisecond,
                        None,
                    ),
                    nullable: false,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: None,
                },
                Field {
                    name: "COUNT(DISTINCT demo.name)",
                    data_type: Int64,
                    nullable: true,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: None,
                },
            ],
            metadata: {},
        },
        input_schema: Schema {
            fields: [
                Field {
                    name: "t",
                    data_type: Timestamp(
                        Millisecond,
                        None,
                    ),
                    nullable: false,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: Some(
                        {
                            "field::comment": "",
                            "field::id": "1",
                            "field::is_tag": "false",
                        },
                    ),
                },
                Field {
                    name: "name",
                    data_type: Utf8,
                    nullable: true,
                    dict_id: 0,
                    dict_is_ordered: false,
                    metadata: Some(
                        {
                            "field::comment": "",
                            "field::id": "3",
                            "field::is_tag": "true",
                        },
                    ),
                },
            ],
            metadata: {
                "schema:num_key_columns": "2",
                "schema::enable_tsid_primary_key": "true",
                "schema::timestamp_index": "0",
                "schema::version": "1",
            },
        },
        metrics: ExecutionPlanMetricsSet {
            inner: Mutex {
                data: MetricsSet {
                    metrics: [],
                },
            },
        },
    },
    metrics: ExecutionPlanMetricsSet {
        inner: Mutex {
            data: MetricsSet {
                metrics: [],
            },
        },
    },
}
@jiacai2050
Copy link
Contributor

jiacai2050 commented Nov 1, 2022

@MachaelLee Thanks for detailed messages. However, the steps above cannot be directly executed in datafusion, it's ceresdb's job to implement the SQL interface.

I found one simple way to reproduce this based on https://github.com/apache/arrow-datafusion/blob/525ac4567ad8d86ad085d8439d890b1f9e9e6bb9/datafusion-examples/examples/memtable.rs#L39

Changes are below:

2 files changed, 6 insertions(+), 8 deletions(-)
datafusion-examples/examples/memtable.rs | 12 +++++-------
datafusion/optimizer/src/optimizer.rs    |  2 +-

modified   datafusion-examples/examples/memtable.rs
@@ -36,14 +36,12 @@ async fn main() -> Result<()> {
     // Register the in-memory table containing the data
     ctx.register_table("users", Arc::new(mem_table))?;
 
-    let dataframe = ctx.sql("SELECT * FROM users;").await?;
+    let dataframe = ctx
+        .sql("SELECT id,count(distinct bank_account) From users group by id;")
+        .await?;
 
     timeout(Duration::from_secs(10), async move {
-        let result = dataframe.collect().await.unwrap();
-        let record_batch = result.get(0).unwrap();
-
-        assert_eq!(1, record_batch.column(0).len());
-        dbg!(record_batch.columns());
+        dataframe.show().await.unwrap();
     })
     .await
     .unwrap();
@@ -57,7 +55,7 @@ fn create_memtable() -> Result<MemTable> {
 
 fn create_record_batch() -> Result<RecordBatch> {
     let id_array = UInt8Array::from(vec![1]);
-    let account_array = UInt64Array::from(vec![9000]);
+    let account_array = UInt64Array::from(vec![None]);
 
     Ok(RecordBatch::try_new(
         get_schema(),
modified   datafusion/optimizer/src/optimizer.rs
@@ -173,7 +173,7 @@ impl Optimizer {
         rules.push(Arc::new(ReduceOuterJoin::new()));
         rules.push(Arc::new(FilterPushDown::new()));
         rules.push(Arc::new(LimitPushDown::new()));
-        rules.push(Arc::new(SingleDistinctToGroupBy::new()));
+        // rules.push(Arc::new(SingleDistinctToGroupBy::new()));
 
         // The previous optimizations added expressions and projections,
         // that might benefit from the following rules

Then execute it via cargo run --example memtable, then we will get following error

thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: InvalidArgumentError("Column 'COUNT(DISTINCT users.bank_account)[count distinct]' is declared as non-nullable but contains null values")', datafusion/core/src/physical_plan/repartition.rs:178:79
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ArrowError(ExternalError(Execution("Join Error: task 17 panicked")))', datafusion-examples/examples/memtable.rs:44:32

@jonmmease
Copy link
Contributor

I believe this will be fixed with apache/arrow-rs#3473. See #4828

@jiacai2050
Copy link
Contributor

@jonmmease Thanks, I will check with latest df to see if this problem remains.

@jonmmease
Copy link
Contributor

I don't think the fix will be available in DataFusion until arrow 31 is released and DataFusion is updated to use this version.

@jiacai2050
Copy link
Contributor

Thanks for remind, I use df via GitHub commit.

It seems HEAD(4bea81b) has broken

$ cargo run --example memtable

error[E0432]: unresolved import `num::complex::ComplexFloat`
  --> /Users/jiacai/.cargo/registry/src/github.aaakk.us.kg-1ecc6299db9ec823/arrow-array-29.0.0/src/arithmetic.rs:21:5
   |                                                                                                                         
21 | use num::complex::ComplexFloat;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ no `ComplexFloat` in `complex`     
                                                                                                                             error[E0599]: no method named `powi` found for struct `f16` in the current scope
   --> /Users/jiacai/.cargo/registry/src/github.aaakk.us.kg-1ecc6299db9ec823/arrow-array-29.0.0/src/arithmetic.rs:351:25
    |
351 |                 Ok(self.powi(exp as i32))
    |                         ^^^^ method not found in `f16`
...                                                           
380 | native_type_float_op!(f16, f16::ONE, f16::ZERO);
    | ----------------------------------------------- in this macro invocation
    |
    = help: items from traits can only be used if the trait is in scope
    = note: this error originates in the macro `native_type_float_op` (in Nightly builds, run with -Z macro-backtrace for mor
e info)  

Any ideas?

@jiacai2050
Copy link
Contributor

jiacai2050 commented Jan 28, 2023

The compile error above is same with apache/arrow-rs#3066, delete cargo.lock fix this.

And I checked this issue have been fixed in latest master(7673fcc). @MachaelLee you can close this issue now.

@findepi
Copy link
Member

findepi commented Jul 3, 2024

5. Query like following statement, with Group-By and Count(DISTINCT) operator.

select `t`, count(distinct name) from demo group by `t`

i guess app was meant?

the bug doesn't look reproducible on current main
@alamb please close

@alamb
Copy link
Contributor

alamb commented Jul 5, 2024

Thanks for checking @findepi and @jiacai2050

@alamb alamb closed this as completed Jul 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants