Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Aug 26, 2024
1 parent d3b18ff commit 6d38e6a
Show file tree
Hide file tree
Showing 42 changed files with 452 additions and 578 deletions.
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions,
};
use datafusion_expr::{case, is_null, lit};
use datafusion_expr::{case, is_null, lit, SortExpr};
use datafusion_expr::{
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
};
Expand Down Expand Up @@ -576,7 +576,7 @@ impl DataFrame {
self,
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
sort_expr: Option<Vec<SortExpr>>,
) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan)
.distinct_on(on_expr, select_expr, sort_expr)?
Expand Down Expand Up @@ -796,7 +796,7 @@ impl DataFrame {
/// # Ok(())
/// # }
/// ```
pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame> {
pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
Ok(DataFrame {
session_state: self.session_state,
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::datasource::{
};
use crate::error::Result;
use crate::execution::context::{SessionConfig, SessionState};
use crate::logical_expr::Expr;

use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::config::TableOptions;
Expand All @@ -41,6 +40,7 @@ use datafusion_common::{
};

use async_trait::async_trait;
use datafusion_expr::SortExpr;

/// Options that control the reading of CSV files.
///
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct CsvReadOptions<'a> {
/// File compression type
pub file_compression_type: FileCompressionType,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for CsvReadOptions<'a> {
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<'a> CsvReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -231,7 +231,7 @@ pub struct ParquetReadOptions<'a> {
/// based on data in file.
pub schema: Option<&'a Schema>,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for ParquetReadOptions<'a> {
Expand Down Expand Up @@ -278,7 +278,7 @@ impl<'a> ParquetReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -397,7 +397,7 @@ pub struct NdJsonReadOptions<'a> {
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for NdJsonReadOptions<'a> {
Expand Down Expand Up @@ -452,7 +452,7 @@ impl<'a> NdJsonReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
}

// TODO other expressions are not handled yet:
// - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases
// - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
Expr::AggregateFunction { .. }
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::datasource::{
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::TableType;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};

use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
Expand Down Expand Up @@ -222,7 +222,7 @@ pub struct ListingOptions {
/// ordering (encapsulated by a `Vec<Expr>`). If there aren't
/// multiple equivalent orderings, the outer `Vec` will have a
/// single element.
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl ListingOptions {
Expand Down Expand Up @@ -385,7 +385,7 @@ impl ListingOptions {
///
/// assert_eq!(listing_options.file_sort_order, file_sort_order);
/// ```
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -1162,7 +1162,7 @@ mod tests {
(vec![], Ok(vec![])),
// not a sort expr
(
vec![vec![col("string_col")]],
vec![vec![col("string_col").sort(true, false)]],
Err("Expected Expr::Sort in output_ordering, but got string_col"),
),
// sort expr, but non column
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use log::debug;
use parking_lot::Mutex;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use datafusion_expr::SortExpr;

/// Type alias for partition data
pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
Expand All @@ -64,7 +65,7 @@ pub struct MemTable {
column_defaults: HashMap<String, Expr>,
/// Optional pre-known sort order(s). Must be `SortExpr`s.
/// inserting data into this table removes the order
pub sort_order: Arc<Mutex<Vec<Vec<Expr>>>>,
pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
}

impl MemTable {
Expand Down
41 changes: 20 additions & 21 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,38 +50,37 @@ pub use statistics::get_statistics_with_limit;

use arrow_schema::{Schema, SortOptions};
use datafusion_common::{plan_err, Result};
use datafusion_expr::Expr;
use datafusion_expr::{Expr, SortExpr};
use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr};

fn create_ordering(
schema: &Schema,
sort_order: &[Vec<Expr>],
sort_order: &[Vec<SortExpr>],
) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];

for exprs in sort_order {
// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
for expr in exprs {
match expr {
Expr::Sort(sort) => match sort.expr.as_ref() {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
sort_exprs.push(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
Err(_) => break,
for sort in exprs {
match sort.expr.as_ref() {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
sort_exprs.push(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
expr => return plan_err!("Expected single column references in output_ordering, got {expr}"),
}
expr => return plan_err!("Expected Expr::Sort in output_ordering, but got {expr}"),
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
Err(_) => break,
},
expr => return plan_err!(
"Expected single column references in output_ordering, got {expr}"
),
}
}
if !sort_exprs.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ mod tests {
name: &'static str,
file_schema: Schema,
files: Vec<File>,
sort: Vec<datafusion_expr::Expr>,
sort: Vec<datafusion_expr::SortExpr>,
expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow_schema::SchemaRef;
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
Expand Down Expand Up @@ -248,7 +248,7 @@ impl StreamProvider for FileStreamProvider {
#[derive(Debug)]
pub struct StreamConfig {
source: Arc<dyn StreamProvider>,
order: Vec<Vec<Expr>>,
order: Vec<Vec<SortExpr>>,
constraints: Constraints,
}

Expand All @@ -263,7 +263,7 @@ impl StreamConfig {
}

/// Specify a sort order for the stream
pub fn with_order(mut self, order: Vec<Vec<Expr>>) -> Self {
pub fn with_order(mut self, order: Vec<Vec<SortExpr>>) -> Self {
self.order = order;
self
}
Expand Down
32 changes: 14 additions & 18 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ use datafusion_common::{
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan,
WindowFrame, WindowFrameBound, WriteOp,
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, SortExpr,
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::LexOrdering;
Expand Down Expand Up @@ -1641,31 +1641,27 @@ pub fn create_aggregate_expr_and_maybe_filter(

/// Create a physical sort expression from a logical expression
pub fn create_physical_sort_expr(
e: &Expr,
e: &SortExpr,
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<PhysicalSortExpr> {
if let Expr::Sort(expr::Sort {
let SortExpr {
expr,
asc,
nulls_first,
}) = e
{
Ok(PhysicalSortExpr {
expr: create_physical_expr(expr, input_dfschema, execution_props)?,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
} else {
internal_err!("Expects a sort expression")
}
} = e;
Ok(PhysicalSortExpr {
expr: create_physical_expr(expr, input_dfschema, execution_props)?,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
}

/// Create vector of physical sort expression from a vector of logical expression
pub fn create_physical_sort_exprs(
exprs: &[Expr],
exprs: &[SortExpr],
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<LexOrdering> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::TableReference;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::{
expressions, AggregateExpr, EquivalenceProperties, PhysicalExpr,
Expand Down Expand Up @@ -362,7 +362,7 @@ pub fn register_unbounded_file_with_ordering(
schema: SchemaRef,
file_path: &Path,
table_name: &str,
file_sort_order: Vec<Vec<Expr>>,
file_sort_order: Vec<Vec<SortExpr>>,
) -> Result<()> {
let source = FileStreamProvider::new_file(schema, file_path.into());
let config = StreamConfig::new(Arc::new(source)).with_order(file_sort_order);
Expand Down
Loading

0 comments on commit 6d38e6a

Please sign in to comment.