Skip to content

Commit

Permalink
Move JoinSelection into datafusion-physical-optimizer crate (#14073) (#…
Browse files Browse the repository at this point in the history
…14085)

* Move JoinSelection into datafusion-physical-optimizer crate (#14073)

* Fix issues causing GitHub checks to fail

* Fix issues causing GitHub checks to fail

* Fix issues causing GitHub checks to fail

* Lock aws-sdk crates to fix MSRV check

* fix comment

* fix compilation

---------

Co-authored-by: Sergey Zhukov <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
3 people authored Jan 14, 2025
1 parent 888df6a commit 3f91a20
Show file tree
Hide file tree
Showing 8 changed files with 464 additions and 424 deletions.
274 changes: 149 additions & 125 deletions datafusion-cli/Cargo.lock

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ readme = "README.md"
[dependencies]
arrow = { version = "53.0.0" }
async-trait = "0.1.73"
aws-config = "1.5.5"
aws-sdk-sso = "1.43.0"
aws-sdk-ssooidc = "1.44.0"
aws-sdk-sts = "1.43.0"
## 1.5.13 requires a hiher MSRV 1.81 so lock until DataFusion MSRV catches up
aws-config = "=1.5.10"
## 1.53.0 requires a higher MSRV 1.81 so lock until DataFusion MSRV catches up
aws-sdk-sso = "=1.50.0"
## 1.54.0 requires a higher MSRV 1.81 so lock until DataFusion MSRV catches up
aws-sdk-ssooidc = "=1.51.0"
## 1.54.1 requires a higher MSRV 1.81 so lock until DataFusion MSRV catches up
aws-sdk-sts = "=1.51.0"
# end pin aws-sdk crates
aws-credential-types = "1.2.0"
clap = { version = "4.5.16", features = ["derive", "cargo"] }
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod join_selection;
pub mod optimizer;
pub mod projection_pushdown;
pub mod replace_with_order_preserving_variants;
Expand Down
96 changes: 2 additions & 94 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#![allow(missing_docs)]

use std::any::Any;
use std::fs::File;
use std::io::prelude::*;
use std::io::{BufReader, BufWriter};
Expand All @@ -42,12 +41,10 @@ use crate::test_util::{aggr_test_schema, arrow_test_data};
use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Statistics};
use datafusion_common::DataFusionError;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};

#[cfg(feature = "compression")]
use bzip2::write::BzEncoder;
Expand Down Expand Up @@ -384,94 +381,5 @@ pub fn csv_exec_ordered(
)
}

/// A mock execution plan that simply returns the provided statistics
#[derive(Debug, Clone)]
pub struct StatisticsExec {
stats: Statistics,
schema: Arc<Schema>,
cache: PlanProperties,
}

impl StatisticsExec {
pub fn new(stats: Statistics, schema: Schema) -> Self {
assert_eq!(
stats.column_statistics.len(), schema.fields().len(),
"if defined, the column statistics vector length should be the number of fields"
);
let cache = Self::compute_properties(Arc::new(schema.clone()));
Self {
stats,
schema: Arc::new(schema),
cache,
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(2),
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}

impl DisplayAs for StatisticsExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"StatisticsExec: col_count={}, row_count={:?}",
self.schema.fields().len(),
self.stats.num_rows,
)
}
}
}
}

impl ExecutionPlan for StatisticsExec {
fn name(&self) -> &'static str {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
&self.cache
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!("This plan only serves for testing statistics")
}

fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}
}

pub mod object_store;
pub mod variable;
140 changes: 2 additions & 138 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,8 @@ use crate::dataframe::DataFrame;
use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
use crate::datasource::{empty::EmptyTable, provider_as_source};
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream,
};
use crate::physical_plan::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
use crate::prelude::{CsvReadOptions, SessionContext};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand All @@ -52,8 +48,7 @@ use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::{expressions, EquivalenceProperties, PhysicalExpr};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_expr::{expressions, PhysicalExpr};

use async_trait::async_trait;
use futures::Stream;
Expand Down Expand Up @@ -230,137 +225,6 @@ impl TableProvider for TestTableProvider {
}
}

/// A mock execution plan that simply returns the provided data source characteristic
#[derive(Debug, Clone)]
pub struct UnboundedExec {
batch_produce: Option<usize>,
batch: RecordBatch,
cache: PlanProperties,
}
impl UnboundedExec {
/// Create new exec that clones the given record batch to its output.
///
/// Set `batch_produce` to `Some(n)` to emit exactly `n` batches per partition.
pub fn new(
batch_produce: Option<usize>,
batch: RecordBatch,
partitions: usize,
) -> Self {
let cache = Self::compute_properties(batch.schema(), batch_produce, partitions);
Self {
batch_produce,
batch,
cache,
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
schema: SchemaRef,
batch_produce: Option<usize>,
n_partitions: usize,
) -> PlanProperties {
let boundedness = if batch_produce.is_none() {
Boundedness::Unbounded {
requires_infinite_memory: false,
}
} else {
Boundedness::Bounded
};
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(n_partitions),
EmissionType::Incremental,
boundedness,
)
}
}

impl DisplayAs for UnboundedExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"UnboundedExec: unbounded={}",
self.batch_produce.is_none(),
)
}
}
}
}

impl ExecutionPlan for UnboundedExec {
fn name(&self) -> &'static str {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
&self.cache
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(UnboundedStream {
batch_produce: self.batch_produce,
count: 0,
batch: self.batch.clone(),
}))
}
}

#[derive(Debug)]
struct UnboundedStream {
batch_produce: Option<usize>,
count: usize,
batch: RecordBatch,
}

impl Stream for UnboundedStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(val) = self.batch_produce {
if val <= self.count {
return Poll::Ready(None);
}
}
self.count += 1;
Poll::Ready(Some(Ok(self.batch.clone())))
}
}

impl RecordBatchStream for UnboundedStream {
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
}

/// This function creates an unbounded sorted file for testing purposes.
pub fn register_unbounded_file_with_ordering(
ctx: &SessionContext,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ datafusion-execution = { workspace = true }
datafusion-expr-common = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
recursive = { workspace = true, optional = true }
Expand All @@ -49,4 +50,5 @@ recursive = { workspace = true, optional = true }
datafusion-expr = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true }
rstest = { workspace = true }
tokio = { workspace = true }
Loading

0 comments on commit 3f91a20

Please sign in to comment.