Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compute_record_batch_statistics wrong with projection #8489

Merged
merged 40 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7afeb8b
Minor: Improve the document format of JoinHashMap
Asura7969 Nov 8, 2023
6332bec
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 10, 2023
cc5e0c7
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 10, 2023
a114310
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 11, 2023
928c811
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 11, 2023
839093e
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 12, 2023
a836cde
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 13, 2023
5648dc7
Merge branch 'apache:main' into main
Asura7969 Nov 13, 2023
a670409
Merge branch 'apache:main' into main
Asura7969 Nov 14, 2023
22894a3
Merge branch 'apache:main' into main
Asura7969 Nov 14, 2023
73a59d2
Merge branch 'apache:main' into main
Asura7969 Nov 15, 2023
46409c2
Merge branch 'apache:main' into main
Asura7969 Nov 16, 2023
8a86a4c
Merge branch 'apache:main' into main
Asura7969 Nov 17, 2023
cf5c584
Merge branch 'apache:main' into main
Asura7969 Nov 17, 2023
62ae9b9
Merge branch 'apache:main' into main
Asura7969 Nov 19, 2023
da02fa2
Merge branch 'apache:main' into main
Asura7969 Nov 20, 2023
d98eb2e
Merge branch 'apache:main' into main
Asura7969 Nov 21, 2023
79e7216
Merge branch 'apache:main' into main
Asura7969 Nov 21, 2023
ba51abd
Merge branch 'apache:main' into main
Asura7969 Nov 23, 2023
2468f52
Merge branch 'apache:main' into main
Asura7969 Nov 23, 2023
180c303
Merge branch 'apache:main' into main
Asura7969 Nov 24, 2023
68980ba
Merge branch 'apache:main' into main
Asura7969 Nov 27, 2023
9411940
Merge branch 'apache:main' into main
Asura7969 Nov 27, 2023
ba28346
Merge branch 'apache:main' into main
Asura7969 Nov 28, 2023
df0942f
Merge branch 'apache:main' into main
Asura7969 Nov 29, 2023
edccb66
Merge branch 'apache:main' into main
Asura7969 Nov 29, 2023
fb74b99
Merge branch 'apache:main' into main
Asura7969 Nov 30, 2023
767b004
Merge branch 'apache:main' into main
Asura7969 Dec 1, 2023
2e0eef5
Merge branch 'apache:main' into main
Asura7969 Dec 2, 2023
749e0c8
Merge branch 'apache:main' into main
Asura7969 Dec 3, 2023
5d43a94
Merge branch 'apache:main' into main
Asura7969 Dec 5, 2023
71047f3
Merge branch 'apache:main' into main
Asura7969 Dec 6, 2023
4b6921b
Merge branch 'apache:main' into main
Asura7969 Dec 7, 2023
deefdd0
fix `compute_record_batch_statistics` wrong with `projection`
Asura7969 Dec 7, 2023
c00027e
Merge branch 'apache:main' into main
Asura7969 Dec 7, 2023
d46a9f9
Merge branch 'apache:main' into main
Asura7969 Dec 8, 2023
41a520f
Merge branch 'apache:main' into main
Asura7969 Dec 11, 2023
632b460
Merge branch 'main' into fix_total_byte_size
Asura7969 Dec 11, 2023
d19294f
fix test
Asura7969 Dec 11, 2023
928cbb1
fix test
Asura7969 Dec 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use datafusion_common::stats::Precision;
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryReservation;
Expand Down Expand Up @@ -139,17 +140,22 @@ pub fn compute_record_batch_statistics(
) -> Statistics {
let nb_rows = batches.iter().flatten().map(RecordBatch::num_rows).sum();

let total_byte_size = batches
.iter()
.flatten()
.map(|b| b.get_array_memory_size())
.sum();

let projection = match projection {
Some(p) => p,
None => (0..schema.fields().len()).collect(),
};

let total_byte_size = batches
.iter()
.flatten()
.map(|b| {
projection
.iter()
.map(|index| b.column(*index).get_array_memory_size())
.sum::<usize>()
})
.sum();
Comment on lines +148 to +157
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


let mut column_statistics = vec![ColumnStatistics::new_unknown(); projection.len()];

for partition in batches.iter() {
Expand Down Expand Up @@ -388,6 +394,7 @@ mod tests {
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use arrow_array::UInt64Array;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{col, Column};

Expand Down Expand Up @@ -685,20 +692,30 @@ mod tests {
let schema = Arc::new(Schema::new(vec![
Field::new("f32", DataType::Float32, false),
Field::new("f64", DataType::Float64, false),
Field::new("u64", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Float32Array::from(vec![1., 2., 3.])),
Arc::new(Float64Array::from(vec![9., 8., 7.])),
Arc::new(UInt64Array::from(vec![4, 5, 6])),
],
)?;

// just select f32,f64
let select_projection = Some(vec![0, 1]);
let byte_size = batch
.project(&select_projection.clone().unwrap())
.unwrap()
.get_array_memory_size();

let actual =
compute_record_batch_statistics(&[vec![batch]], &schema, Some(vec![0, 1]));
compute_record_batch_statistics(&[vec![batch]], &schema, select_projection);

let mut expected = Statistics {
let expected = Statistics {
num_rows: Precision::Exact(3),
total_byte_size: Precision::Exact(464), // this might change a bit if the way we compute the size changes
total_byte_size: Precision::Exact(byte_size),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is appropriate, if you have any good suggestions please leave a message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok and a nice way to make the code less brittle to future changes in arrow's layout

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious as to why the previous code was Precision::Exact(464).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it happens to be the (current) size of the record batch in the test:

        let batch = RecordBatch::try_new(
            Arc::clone(&schema),
            vec![
                Arc::new(Float32Array::from(vec![1., 2., 3.])),
                Arc::new(Float64Array::from(vec![9., 8., 7.])),
                Arc::new(UInt64Array::from(vec![4, 5, 6])),
            ],
        )?;

column_statistics: vec![
ColumnStatistics {
distinct_count: Precision::Absent,
Expand All @@ -715,9 +732,6 @@ mod tests {
],
};

// Prevent test flakiness due to undefined / changing implementation details
expected.total_byte_size = actual.total_byte_size.clone();

assert_eq!(actual, expected);
Ok(())
}
Expand Down
21 changes: 11 additions & 10 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2021,14 +2021,15 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallySorted([0])
--------------SortExec: expr=[col0@3 ASC NULLS LAST]
----------------CoalesceBatchesExec: target_batch_size=8192
------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
--------------------CoalesceBatchesExec: target_batch_size=8192
----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
------------------------MemoryExec: partitions=1, partition_sizes=[3]
--------------------CoalesceBatchesExec: target_batch_size=8192
----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
------------------------MemoryExec: partitions=1, partition_sizes=[3]
----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks to me like the change is due to the fact the join inputs were reordered and this projection puts the columns back the expected way.

Same thing with the projection below

------------------CoalesceBatchesExec: target_batch_size=8192
--------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
----------------------CoalesceBatchesExec: target_batch_size=8192
------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
--------------------------MemoryExec: partitions=1, partition_sizes=[3]
----------------------CoalesceBatchesExec: target_batch_size=8192
------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
--------------------------MemoryExec: partitions=1, partition_sizes=[3]

# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
Expand Down Expand Up @@ -2709,9 +2710,9 @@ SortExec: expr=[sn@2 ASC NULLS LAST]
--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate]
----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount)]
------SortExec: expr=[sn@5 ASC NULLS LAST]
--------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, sn@5 as sn, amount@8 as amount]
--------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount]
----------CoalesceBatchesExec: target_batch_size=8192
------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@4, currency@2)], filter=ts@0 >= ts@1
------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------------MemoryExec: partitions=1, partition_sizes=[1]

Expand Down
42 changes: 20 additions & 22 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1569,15 +1569,13 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
----TableScan: join_t1 projection=[t1_id, t1_name]
----TableScan: join_t2 projection=[t2_id]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@3 as t2_id, t1_name@1 as t1_name]
ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name]
--CoalesceBatchesExec: target_batch_size=2
----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)]
------CoalescePartitionsExec
--------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]
------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------MemoryExec: partitions=1, partition_sizes=[1]
----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t2_id@0, join_t1.t1_id + UInt32(11)@2)]
------MemoryExec: partitions=1, partition_sizes=[1]
------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------MemoryExec: partitions=1, partition_sizes=[1]

statement ok
set datafusion.optimizer.repartition_joins = true;
Expand All @@ -1595,18 +1593,18 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
----TableScan: join_t1 projection=[t1_id, t1_name]
----TableScan: join_t2 projection=[t2_id]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@3 as t2_id, t1_name@1 as t1_name]
ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name]
--CoalesceBatchesExec: target_batch_size=2
----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)]
----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t2_id@0, join_t1.t1_id + UInt32(11)@2)]
------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]
------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(11)@2], 2), input_partitions=2
----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]

# Right side expr key inner join

Expand Down Expand Up @@ -2821,13 +2819,13 @@ physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]

Expand Down Expand Up @@ -2862,13 +2860,13 @@ physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]

Expand Down Expand Up @@ -2924,7 +2922,7 @@ physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
--------MemoryExec: partitions=1, partition_sizes=[1]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------MemoryExec: partitions=1, partition_sizes=[1]
Expand Down Expand Up @@ -2960,7 +2958,7 @@ physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
--------MemoryExec: partitions=1, partition_sizes=[1]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------MemoryExec: partitions=1, partition_sizes=[1]
Expand Down