Skip to content

Commit

Permalink
fix partition-by panic (apache#12297)
Browse files Browse the repository at this point in the history
  • Loading branch information
thinh2 authored Sep 3, 2024
1 parent 8db30e2 commit 4e1b6de
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
15 changes: 13 additions & 2 deletions datafusion/physical-plan/src/coalesce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use arrow::compute::concat_batches;
use arrow_array::builder::StringViewBuilder;
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, RecordBatch};
use arrow_array::{Array, ArrayRef, RecordBatch, RecordBatchOptions};
use arrow_schema::SchemaRef;
use std::sync::Arc;

Expand Down Expand Up @@ -265,7 +265,9 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
}
})
.collect();
RecordBatch::try_new(batch.schema(), new_columns)
let mut options = RecordBatchOptions::new();
options = options.with_row_count(Some(batch.num_rows()));
RecordBatch::try_new_with_options(batch.schema(), new_columns, &options)
.expect("Failed to re-create the gc'ed record batch")
}

Expand Down Expand Up @@ -501,6 +503,15 @@ mod tests {
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
}

#[test]
fn test_gc_string_view_test_batch_empty() {
let schema = Schema::empty();
let batch = RecordBatch::new_empty(schema.into());
let output_batch = gc_string_view_batch(&batch);
assert_eq!(batch.num_columns(), output_batch.num_columns());
assert_eq!(batch.num_rows(), output_batch.num_rows());
}

#[test]
fn test_gc_string_view_batch_large_no_compact() {
// view with large strings (has buffers) but full --> no need to compact
Expand Down
12 changes: 9 additions & 3 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Stat
use arrow::array::ArrayRef;
use arrow::datatypes::{SchemaRef, UInt64Type};
use arrow::record_batch::RecordBatch;
use arrow_array::PrimitiveArray;
use arrow_array::{PrimitiveArray, RecordBatchOptions};
use datafusion_common::utils::transpose;
use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
Expand Down Expand Up @@ -309,8 +309,14 @@ impl BatchPartitioner {
})
.collect::<Result<Vec<ArrayRef>>>()?;

let batch =
RecordBatch::try_new(batch.schema(), columns).unwrap();
let mut options = RecordBatchOptions::new();
options = options.with_row_count(Some(indices.len()));
let batch = RecordBatch::try_new_with_options(
batch.schema(),
columns,
&options,
)
.unwrap();

Ok((partition, batch))
});
Expand Down
20 changes: 20 additions & 0 deletions datafusion/sqllogictest/test_files/repartition.slt
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,23 @@ physical_plan
04)------FilterExec: c3@2 > 0
05)--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
06)----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

# Start repratition on empty column test.
# See https://github.com/apache/datafusion/issues/12057

statement ok
CREATE TABLE t1(v1 int);

statement ok
INSERT INTO t1 values(42);

query I
SELECT sum(1) OVER (PARTITION BY false=false)
FROM t1 WHERE ((false > (v1 = v1)) IS DISTINCT FROM true);
----
1

statement ok
DROP TABLE t1;

# End repartition on empty columns test

0 comments on commit 4e1b6de

Please sign in to comment.