Skip to content

Commit

Permalink
Fix compute_record_batch_statistics wrong with projection (#8489)
Browse files Browse the repository at this point in the history
* Minor: Improve the document format of JoinHashMap

* fix `compute_record_batch_statistics` wrong with `projection`

* fix test

* fix test
  • Loading branch information
Asura7969 authored Dec 16, 2023
1 parent b71bec0 commit 0fcd077
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 44 deletions.
38 changes: 26 additions & 12 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use datafusion_common::stats::Precision;
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryReservation;
Expand Down Expand Up @@ -139,17 +140,22 @@ pub fn compute_record_batch_statistics(
) -> Statistics {
let nb_rows = batches.iter().flatten().map(RecordBatch::num_rows).sum();

let total_byte_size = batches
.iter()
.flatten()
.map(|b| b.get_array_memory_size())
.sum();

let projection = match projection {
Some(p) => p,
None => (0..schema.fields().len()).collect(),
};

let total_byte_size = batches
.iter()
.flatten()
.map(|b| {
projection
.iter()
.map(|index| b.column(*index).get_array_memory_size())
.sum::<usize>()
})
.sum();

let mut column_statistics = vec![ColumnStatistics::new_unknown(); projection.len()];

for partition in batches.iter() {
Expand Down Expand Up @@ -388,6 +394,7 @@ mod tests {
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use arrow_array::UInt64Array;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{col, Column};

Expand Down Expand Up @@ -685,20 +692,30 @@ mod tests {
let schema = Arc::new(Schema::new(vec![
Field::new("f32", DataType::Float32, false),
Field::new("f64", DataType::Float64, false),
Field::new("u64", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Float32Array::from(vec![1., 2., 3.])),
Arc::new(Float64Array::from(vec![9., 8., 7.])),
Arc::new(UInt64Array::from(vec![4, 5, 6])),
],
)?;

// just select f32,f64
let select_projection = Some(vec![0, 1]);
let byte_size = batch
.project(&select_projection.clone().unwrap())
.unwrap()
.get_array_memory_size();

let actual =
compute_record_batch_statistics(&[vec![batch]], &schema, Some(vec![0, 1]));
compute_record_batch_statistics(&[vec![batch]], &schema, select_projection);

let mut expected = Statistics {
let expected = Statistics {
num_rows: Precision::Exact(3),
total_byte_size: Precision::Exact(464), // this might change a bit if the way we compute the size changes
total_byte_size: Precision::Exact(byte_size),
column_statistics: vec![
ColumnStatistics {
distinct_count: Precision::Absent,
Expand All @@ -715,9 +732,6 @@ mod tests {
],
};

// Prevent test flakiness due to undefined / changing implementation details
expected.total_byte_size = actual.total_byte_size.clone();

assert_eq!(actual, expected);
Ok(())
}
Expand Down
21 changes: 11 additions & 10 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2021,14 +2021,15 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallySorted([0])
--------------SortExec: expr=[col0@3 ASC NULLS LAST]
----------------CoalesceBatchesExec: target_batch_size=8192
------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
--------------------CoalesceBatchesExec: target_batch_size=8192
----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
------------------------MemoryExec: partitions=1, partition_sizes=[3]
--------------------CoalesceBatchesExec: target_batch_size=8192
----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
------------------------MemoryExec: partitions=1, partition_sizes=[3]
----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1]
------------------CoalesceBatchesExec: target_batch_size=8192
--------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
----------------------CoalesceBatchesExec: target_batch_size=8192
------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
--------------------------MemoryExec: partitions=1, partition_sizes=[3]
----------------------CoalesceBatchesExec: target_batch_size=8192
------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
--------------------------MemoryExec: partitions=1, partition_sizes=[3]

# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
Expand Down Expand Up @@ -2709,9 +2710,9 @@ SortExec: expr=[sn@2 ASC NULLS LAST]
--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate]
----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount)]
------SortExec: expr=[sn@5 ASC NULLS LAST]
--------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, sn@5 as sn, amount@8 as amount]
--------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount]
----------CoalesceBatchesExec: target_batch_size=8192
------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@4, currency@2)], filter=ts@0 >= ts@1
------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------------MemoryExec: partitions=1, partition_sizes=[1]

Expand Down
42 changes: 20 additions & 22 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1569,15 +1569,13 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
----TableScan: join_t1 projection=[t1_id, t1_name]
----TableScan: join_t2 projection=[t2_id]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@3 as t2_id, t1_name@1 as t1_name]
ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name]
--CoalesceBatchesExec: target_batch_size=2
----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)]
------CoalescePartitionsExec
--------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]
------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------MemoryExec: partitions=1, partition_sizes=[1]
----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t2_id@0, join_t1.t1_id + UInt32(11)@2)]
------MemoryExec: partitions=1, partition_sizes=[1]
------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------MemoryExec: partitions=1, partition_sizes=[1]

statement ok
set datafusion.optimizer.repartition_joins = true;
Expand All @@ -1595,18 +1593,18 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
----TableScan: join_t1 projection=[t1_id, t1_name]
----TableScan: join_t2 projection=[t2_id]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@3 as t2_id, t1_name@1 as t1_name]
ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name]
--CoalesceBatchesExec: target_batch_size=2
----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)]
----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t2_id@0, join_t1.t1_id + UInt32(11)@2)]
------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]
------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(11)@2], 2), input_partitions=2
----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]

# Right side expr key inner join

Expand Down Expand Up @@ -2821,13 +2819,13 @@ physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]

Expand Down Expand Up @@ -2862,13 +2860,13 @@ physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]

Expand Down Expand Up @@ -2924,7 +2922,7 @@ physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
--------MemoryExec: partitions=1, partition_sizes=[1]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------MemoryExec: partitions=1, partition_sizes=[1]
Expand Down Expand Up @@ -2960,7 +2958,7 @@ physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)]
------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
--------MemoryExec: partitions=1, partition_sizes=[1]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------MemoryExec: partitions=1, partition_sizes=[1]
Expand Down

0 comments on commit 0fcd077

Please sign in to comment.