Skip to content

Commit

Permalink
chore: deprecate ValuesExec in favour of MemoryExec (#14032)
Browse files Browse the repository at this point in the history
* chore: deprecate `ValuesExec` in favour of `MemoryExec`

* clippy fix

* Update datafusion/physical-plan/src/values.rs

Co-authored-by: Andrew Lamb <[email protected]>

* change to memoryexec

* Update datafusion/physical-plan/src/memory.rs

Co-authored-by: Jay Zhan <[email protected]>

* use compute properties

* clippy fix

---------

Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Jay Zhan <[email protected]>
  • Loading branch information
3 people authored Jan 12, 2025
1 parent 167c11e commit 0a2c027
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 12 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::unnest::UnnestExec;
use crate::physical_plan::values::ValuesExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{
displayable, windows, ExecutionPlan, ExecutionPlanProperties, InputOrderMode,
Expand Down Expand Up @@ -466,7 +465,8 @@ impl DefaultPhysicalPlanner {
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
})
.collect::<Result<Vec<_>>>()?;
let value_exec = ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?;
let value_exec =
MemoryExec::try_new_as_values(SchemaRef::new(exec_schema), exprs)?;
Arc::new(value_exec)
}
LogicalPlan::EmptyRelation(EmptyRelation {
Expand Down
192 changes: 189 additions & 3 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use super::{
common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
Statistics,
};
use crate::execution_plan::{Boundedness, EmissionType};

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, project_schema, Result};
use arrow_array::RecordBatchOptions;
use arrow_schema::Schema;
use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::ProjectionMapping;
Expand Down Expand Up @@ -174,6 +177,96 @@ impl MemoryExec {
})
}

/// Create a new execution plan from a list of constant values (`ValuesExec`)
pub fn try_new_as_values(
schema: SchemaRef,
data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
) -> Result<Self> {
if data.is_empty() {
return plan_err!("Values list cannot be empty");
}

let n_row = data.len();
let n_col = schema.fields().len();

// We have this single row batch as a placeholder to satisfy evaluation argument
// and generate a single output row
let placeholder_schema = Arc::new(Schema::empty());
let placeholder_batch = RecordBatch::try_new_with_options(
Arc::clone(&placeholder_schema),
vec![],
&RecordBatchOptions::new().with_row_count(Some(1)),
)?;

// Evaluate each column
let arrays = (0..n_col)
.map(|j| {
(0..n_row)
.map(|i| {
let expr = &data[i][j];
let result = expr.evaluate(&placeholder_batch)?;

match result {
ColumnarValue::Scalar(scalar) => Ok(scalar),
ColumnarValue::Array(array) if array.len() == 1 => {
ScalarValue::try_from_array(&array, 0)
}
ColumnarValue::Array(_) => {
plan_err!("Cannot have array values in a values list")
}
}
})
.collect::<Result<Vec<_>>>()
.and_then(ScalarValue::iter_to_array)
})
.collect::<Result<Vec<_>>>()?;

let batch = RecordBatch::try_new_with_options(
Arc::clone(&schema),
arrays,
&RecordBatchOptions::new().with_row_count(Some(n_row)),
)?;

let partitions = vec![batch];
Self::try_new_from_batches(Arc::clone(&schema), partitions)
}

/// Create a new plan using the provided schema and batches.
///
/// Errors if any of the batches don't match the provided schema, or if no
/// batches are provided.
pub fn try_new_from_batches(
schema: SchemaRef,
batches: Vec<RecordBatch>,
) -> Result<Self> {
if batches.is_empty() {
return plan_err!("Values list cannot be empty");
}

for batch in &batches {
let batch_schema = batch.schema();
if batch_schema != schema {
return plan_err!(
"Batch has invalid schema. Expected: {}, got: {}",
schema,
batch_schema
);
}
}

let partitions = vec![batches];
let cache = Self::compute_properties(Arc::clone(&schema), &[], &partitions);
Ok(Self {
partitions,
schema: Arc::clone(&schema),
projected_schema: Arc::clone(&schema),
projection: None,
sort_information: vec![],
cache,
show_sizes: true,
})
}

/// Set `show_sizes` to determine whether to display partition sizes
pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
self.show_sizes = show_sizes;
Expand Down Expand Up @@ -696,3 +789,96 @@ mod lazy_memory_tests {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::lit;
use crate::test::{self, make_partition};

use arrow_schema::{DataType, Field};
use datafusion_common::stats::{ColumnStatistics, Precision};

#[tokio::test]
async fn values_empty_case() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = MemoryExec::try_new_as_values(schema, vec![]);
assert!(empty.is_err());
Ok(())
}

#[test]
fn new_exec_with_batches() {
let batch = make_partition(7);
let schema = batch.schema();
let batches = vec![batch.clone(), batch];
let _exec = MemoryExec::try_new_from_batches(schema, batches).unwrap();
}

#[test]
fn new_exec_with_batches_empty() {
let batch = make_partition(7);
let schema = batch.schema();
let _ = MemoryExec::try_new_from_batches(schema, Vec::new()).unwrap_err();
}

#[test]
fn new_exec_with_batches_invalid_schema() {
let batch = make_partition(7);
let batches = vec![batch.clone(), batch];

let invalid_schema = Arc::new(Schema::new(vec![
Field::new("col0", DataType::UInt32, false),
Field::new("col1", DataType::Utf8, false),
]));
let _ = MemoryExec::try_new_from_batches(invalid_schema, batches).unwrap_err();
}

// Test issue: https://github.com/apache/datafusion/issues/8763
#[test]
fn new_exec_with_non_nullable_schema() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
DataType::UInt32,
false,
)]));
let _ = MemoryExec::try_new_as_values(Arc::clone(&schema), vec![vec![lit(1u32)]])
.unwrap();
// Test that a null value is rejected
let _ = MemoryExec::try_new_as_values(
schema,
vec![vec![lit(ScalarValue::UInt32(None))]],
)
.unwrap_err();
}

#[test]
fn values_stats_with_nulls_only() -> Result<()> {
let data = vec![
vec![lit(ScalarValue::Null)],
vec![lit(ScalarValue::Null)],
vec![lit(ScalarValue::Null)],
];
let rows = data.len();
let values = MemoryExec::try_new_as_values(
Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
data,
)?;

assert_eq!(
values.statistics()?,
Statistics {
num_rows: Precision::Exact(rows),
total_byte_size: Precision::Exact(8), // not important
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(rows), // there are only nulls
distinct_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
},],
}
);

Ok(())
}
}
18 changes: 17 additions & 1 deletion datafusion/physical-plan/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;

/// Execution plan for values list based relation (produces constant rows)
#[deprecated(since = "45.0.0", note = "Use `MemoryExec::try_new_as_values` instead")]
#[derive(Debug, Clone)]
pub struct ValuesExec {
/// The schema
Expand All @@ -44,6 +45,7 @@ pub struct ValuesExec {
cache: PlanProperties,
}

#[allow(deprecated)]
impl ValuesExec {
/// Create a new values exec from data as expr
pub fn try_new(
Expand Down Expand Up @@ -117,6 +119,7 @@ impl ValuesExec {
}

let cache = Self::compute_properties(Arc::clone(&schema));
#[allow(deprecated)]
Ok(ValuesExec {
schema,
data: batches,
Expand All @@ -126,6 +129,7 @@ impl ValuesExec {

/// Provides the data
pub fn data(&self) -> Vec<RecordBatch> {
#[allow(deprecated)]
self.data.clone()
}

Expand All @@ -140,6 +144,7 @@ impl ValuesExec {
}
}

#[allow(deprecated)]
impl DisplayAs for ValuesExec {
fn fmt_as(
&self,
Expand All @@ -154,6 +159,7 @@ impl DisplayAs for ValuesExec {
}
}

#[allow(deprecated)]
impl ExecutionPlan for ValuesExec {
fn name(&self) -> &'static str {
"ValuesExec"
Expand All @@ -165,6 +171,7 @@ impl ExecutionPlan for ValuesExec {
}

fn properties(&self) -> &PlanProperties {
#[allow(deprecated)]
&self.cache
}

Expand All @@ -176,6 +183,7 @@ impl ExecutionPlan for ValuesExec {
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
#[allow(deprecated)]
ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone())
.map(|e| Arc::new(e) as _)
}
Expand All @@ -194,6 +202,7 @@ impl ExecutionPlan for ValuesExec {

Ok(Box::pin(MemoryStream::try_new(
self.data(),
#[allow(deprecated)]
Arc::clone(&self.schema),
None,
)?))
Expand All @@ -203,6 +212,7 @@ impl ExecutionPlan for ValuesExec {
let batch = self.data();
Ok(common::compute_record_batch_statistics(
&[batch],
#[allow(deprecated)]
&self.schema,
None,
))
Expand All @@ -221,6 +231,7 @@ mod tests {
#[tokio::test]
async fn values_empty_case() -> Result<()> {
let schema = test::aggr_test_schema();
#[allow(deprecated)]
let empty = ValuesExec::try_new(schema, vec![]);
assert!(empty.is_err());
Ok(())
Expand All @@ -231,14 +242,15 @@ mod tests {
let batch = make_partition(7);
let schema = batch.schema();
let batches = vec![batch.clone(), batch];

#[allow(deprecated)]
let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap();
}

#[test]
fn new_exec_with_batches_empty() {
let batch = make_partition(7);
let schema = batch.schema();
#[allow(deprecated)]
let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err();
}

Expand All @@ -251,6 +263,7 @@ mod tests {
Field::new("col0", DataType::UInt32, false),
Field::new("col1", DataType::Utf8, false),
]));
#[allow(deprecated)]
let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err();
}

Expand All @@ -262,8 +275,10 @@ mod tests {
DataType::UInt32,
false,
)]));
#[allow(deprecated)]
let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap();
// Test that a null value is rejected
#[allow(deprecated)]
let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]])
.unwrap_err();
}
Expand All @@ -276,6 +291,7 @@ mod tests {
vec![lit(ScalarValue::Null)],
];
let rows = data.len();
#[allow(deprecated)]
let values = ValuesExec::try_new(
Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
data,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ physical_plan
01)DataSinkExec: sink=CsvSink(file_groups=[])
02)--SortExec: expr=[a@0 ASC NULLS LAST, b@1 DESC], preserve_partitioning=[false]
03)----ProjectionExec: expr=[column1@0 as a, column2@1 as b]
04)------ValuesExec
04)------MemoryExec: partitions=1, partition_sizes=[1]

query I
INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -786,15 +786,15 @@ physical_plan
08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
09)----------------AggregateExec: mode=Partial, gby=[t@0 as t], aggr=[]
10)------------------ProjectionExec: expr=[column1@0 as t]
11)--------------------ValuesExec
11)--------------------MemoryExec: partitions=1, partition_sizes=[1]
12)------ProjectionExec: expr=[1 as m, t@0 as t]
13)--------AggregateExec: mode=FinalPartitioned, gby=[t@0 as t], aggr=[]
14)----------CoalesceBatchesExec: target_batch_size=8192
15)------------RepartitionExec: partitioning=Hash([t@0], 2), input_partitions=2
16)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
17)----------------AggregateExec: mode=Partial, gby=[t@0 as t], aggr=[]
18)------------------ProjectionExec: expr=[column1@0 as t]
19)--------------------ValuesExec
19)--------------------MemoryExec: partitions=1, partition_sizes=[1]

#####
# Multi column sorting with lists
Expand Down
Loading

0 comments on commit 0a2c027

Please sign in to comment.