Skip to content

Commit

Permalink
Dynamic partition pruning (#1102)
Browse files Browse the repository at this point in the history
* q3 functionality

* style and minor functionality changes

* some cleanup

* save progress

* use inlist instead of binaryexpr

* fix cargo test

* fix some queries

* use with_max_passes=1 and remove todos

* add warning

* only run dpp once

* null handling and double dtype

* minor style fixes

* clippy

* use adp imports

* add jeremy suggestions and better type logic

* style fix

* MORE int/float logic

* style fix

* fix some bugs

* add dask_config

* check for duplicate tablescans

* fix row iterator

* clippy

* clippy again

* add per-query config

* style

---------

Co-authored-by: Jeremy Dyer <[email protected]>
Co-authored-by: Ayush Dattagupta <[email protected]>
  • Loading branch information
3 people authored Jun 22, 2023
1 parent c9eeb2c commit f8bf06c
Show file tree
Hide file tree
Showing 7 changed files with 1,147 additions and 8 deletions.
34 changes: 27 additions & 7 deletions dask_planner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub struct DaskSQLContext {
current_schema: String,
schemas: HashMap<String, schema::DaskSchema>,
options: ConfigOptions,
dynamic_partition_pruning: bool,
}

impl ContextProvider for DaskSQLContext {
Expand Down Expand Up @@ -457,9 +458,15 @@ impl DaskSQLContext {
current_schema: default_schema_name.to_owned(),
schemas: HashMap::new(),
options: ConfigOptions::new(),
dynamic_partition_pruning: false,
}
}

pub fn apply_dynamic_partition_pruning(&mut self, config: bool) -> PyResult<()> {
self.dynamic_partition_pruning = config;
Ok(())
}

/// Change the current schema
pub fn use_schema(&mut self, schema_name: &str) -> PyResult<()> {
if self.schemas.contains_key(schema_name) {
Expand Down Expand Up @@ -546,13 +553,26 @@ impl DaskSQLContext {
warn!("This LogicalPlan does not support Optimization. Returning original");
Ok(existing_plan)
}
_ => optimizer::DaskSqlOptimizer::new()
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp),
_ => {
let optimized_plan = optimizer::DaskSqlOptimizer::new()
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp);
if self.dynamic_partition_pruning {
optimizer::DaskSqlOptimizer::dynamic_partition_pruner()
.optimize_once(optimized_plan.unwrap().original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp)
} else {
optimized_plan
}
}
}
}
Err(e) => Err(py_optimization_exp(e)),
Expand Down
22 changes: 22 additions & 0 deletions dask_planner/src/sql/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use datafusion_python::{
};
use log::{debug, trace};

mod dynamic_partition_pruning;
use dynamic_partition_pruning::DynamicPartitionPruning;

mod join_reorder;
use join_reorder::JoinReorder;

Expand Down Expand Up @@ -86,13 +89,32 @@ impl DaskSqlOptimizer {
}
}

// Create a separate instance of this optimization rule, since we want to ensure that it only
// runs one time
pub fn dynamic_partition_pruner() -> Self {
let rule: Vec<Arc<dyn OptimizerRule + Sync + Send>> =
vec![Arc::new(DynamicPartitionPruning::new())];

Self {
optimizer: Optimizer::with_rules(rule),
}
}

/// Iterates through the configured `OptimizerRule`(s) to transform the input `LogicalPlan`
/// to its final optimized form
pub(crate) fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan, DataFusionError> {
let config = OptimizerContext::new();
self.optimizer.optimize(&plan, &config, Self::observe)
}

/// Iterates once through the configured `OptimizerRule`(s) to transform the input `LogicalPlan`
/// to its final optimized form
pub(crate) fn optimize_once(&self, plan: LogicalPlan) -> Result<LogicalPlan, DataFusionError> {
let mut config = OptimizerContext::new();
config = OptimizerContext::with_max_passes(config, 1);
self.optimizer.optimize(&plan, &config, Self::observe)
}

fn observe(optimized_plan: &LogicalPlan, optimization: &dyn OptimizerRule) {
trace!(
"== AFTER APPLYING RULE {} ==\n{}\n",
Expand Down
Loading

0 comments on commit f8bf06c

Please sign in to comment.