Skip to content

Commit

Permalink
Datafusion invalid projection (#571)
Browse files Browse the repository at this point in the history
* Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral

* Updates for test_filter

* more of test_filter.py working with the exception of some date pytests

* Add workflow to keep datafusion dev branch up to date (#440)

* Include setuptools-rust in conda build recipie, in host and run

* Remove PyArrow dependency

* rebase with datafusion-sql-planner

* refactor changes that were inadvertent during rebase

* timestamp with loglca time zone

* Bump DataFusion version (#494)

* bump DataFusion version

* remove unnecessary downcasts and use separate structs for TableSource and TableProvider

* Include RelDataType work

* Include RelDataType work

* Introduced SqlTypeName Enum in Rust and mappings for Python

* impl PyExpr.getIndex()

* add getRowType() for logical.rs

* Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes

* use str values instead of Rust Enums, Python is unable to Hash the Rust Enums if used in a dict

* linter changes, why did that work on my local pre-commit??

* linter changes, why did that work on my local pre-commit??

* Convert final strs to SqlTypeName Enum

* removed a few print statements

* commit to share with colleague

* updates

* checkpoint

* Temporarily disable conda run_test.py script since it uses features not yet implemented

* formatting after upstream merge

* expose fromString method for SqlTypeName to use Enums instead of strings for type checking

* expanded SqlTypeName from_string() support

* accept INT as INTEGER

* tests update

* checkpoint

* checkpoint

* Refactor PyExpr by removing From trait, and using recursion to expand expression list for rex calls

* skip test that uses create statement for gpuci

* Basic DataFusion Select Functionality (#489)

* Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral

* Updates for test_filter

* more of test_filter.py working with the exception of some date pytests

* Add workflow to keep datafusion dev branch up to date (#440)

* Include setuptools-rust in conda build recipie, in host and run

* Remove PyArrow dependency

* rebase with datafusion-sql-planner

* refactor changes that were inadvertent during rebase

* timestamp with loglca time zone

* Include RelDataType work

* Include RelDataType work

* Introduced SqlTypeName Enum in Rust and mappings for Python

* impl PyExpr.getIndex()

* add getRowType() for logical.rs

* Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes

* use str values instead of Rust Enums, Python is unable to Hash the Rust Enums if used in a dict

* linter changes, why did that work on my local pre-commit??

* linter changes, why did that work on my local pre-commit??

* Convert final strs to SqlTypeName Enum

* removed a few print statements

* Temporarily disable conda run_test.py script since it uses features not yet implemented

* expose fromString method for SqlTypeName to use Enums instead of strings for type checking

* expanded SqlTypeName from_string() support

* accept INT as INTEGER

* Remove print statements

* Default to UTC if tz is None

* Delegate timezone handling to the arrow library

* Updates from review

Co-authored-by: Charles Blackmon-Luca <[email protected]>

* updates for expression

* uncommented pytests

* uncommented pytests

* code cleanup for review

* code cleanup for review

* Enabled more pytest that work now

* Enabled more pytest that work now

* Output Expression as String when BinaryExpr does not contain a named alias

* Output Expression as String when BinaryExpr does not contain a named alias

* Disable 2 pytest that are causing gpuCI issues. They will be address in a follow up PR

* Handle Between operation for case-when

* adjust timestamp casting

* Refactor projection _column_name() logic to the _column_name logic in expression.rs

* removed println! statements

* introduce join getCondition() logic for retrieving the combining Rex logic for joining

* Updates from review

* Add Offset and point to repo with offset in datafusion

* Introduce offset

* limit updates

* commit before upstream merge

* Code formatting

* update Cargo.toml to use Arrow-DataFusion version with LIMIT logic

* Bump DataFusion version to get changes around variant_name()

* Use map partitions for determining the offset

* Merge with upstream

* Rename underlying DataContainer's DataFrame instance to match the column container names

* Adjust ColumnContainer mapping after join.py logic to entire the bakend mapping is reset

* Add enumerate to column_{i} generation string to ensure columns exist in both dataframes

* Adjust join schema logic to perform merge instead of join on rust side to avoid name collisions

* Handle DataFusion COUNT(UInt8(1)) as COUNT(*)

* commit before merge

* Update function for gathering index of a expression

* Update for review check

* Adjust RelDataType to retrieve fully qualified column names

* Adjust base.py to get fully qualified column  name

* Enable passing pytests in test_join.py

* Adjust keys provided by getting backend column mapping name

* Adjust output_col to not use the backend_column name for special reserved exprs

* uncomment cross join pytest which works now

* Uncomment passing pytests in test_select.py

* Review updates

* Add back complex join case condition, not just cross join but 'complex' joins

* Enable DataFusion CBO logic

* Disable EliminateFilter optimization rule

* updates

* Disable tests that hit CBO generated plan edge cases of yet to be implemented logic

* [REVIEW] - Modifiy sql.skip_optimize to use dask_config.get and remove used method parameter

* [REVIEW] - change name of configuration from skip_optimize to optimize

* [REVIEW] - Add OptimizeException catch and raise statements back

* Found issue where backend column names which are results of a single aggregate resulting column, COUNT(*) for example, need to get the first agg df column since names are not valid

* Remove SQL from OptimizationException

* skip tests that CBO plan reorganization causes missing features to be present

* If TableScan contains projections use those instead of all of the TableColums for limiting columns read during table_scan

* [REVIEW] remove compute(), remove temp row_type variable

* [REVIEW] - Add test for projection pushdown

* [REVIEW] - Add some more parametrized test combinations

* [REVIEW] - Use iterator instead of for loop and simplify contains_projections

* [REVIEW] - merge upstream and adjust imports

* [REVIEW] - Rename pytest function and remove duplicate table creation

Co-authored-by: Charles Blackmon-Luca <[email protected]>
Co-authored-by: Andy Grove <[email protected]>
  • Loading branch information
3 people authored Jun 14, 2022
1 parent a52dd7b commit 230d726
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 7 deletions.
8 changes: 7 additions & 1 deletion dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ mod filter;
mod join;
mod limit;
mod offset;
pub mod projection;
mod projection;
mod sort;
mod table_scan;
mod union;

use datafusion_common::{Column, DFSchemaRef, DataFusionError, Result};
Expand Down Expand Up @@ -111,6 +112,11 @@ impl PyLogicalPlan {
to_py_plan(self.current_node.as_ref())
}

/// LogicalPlan::TableScan as PyTableScan
pub fn table_scan(&self) -> PyResult<table_scan::PyTableScan> {
to_py_plan(self.current_node.as_ref())
}

/// Gets the "input" for the current LogicalPlan
pub fn get_inputs(&mut self) -> PyResult<Vec<PyLogicalPlan>> {
let mut py_inputs: Vec<PyLogicalPlan> = Vec::new();
Expand Down
45 changes: 45 additions & 0 deletions dask_planner/src/sql/logical/table_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::sql::exceptions::py_type_err;
use crate::sql::logical;
use datafusion_expr::logical_plan::TableScan;
use pyo3::prelude::*;

#[pyclass(name = "TableScan", module = "dask_planner", subclass)]
#[derive(Clone)]
pub struct PyTableScan {
pub(crate) table_scan: TableScan,
}

#[pymethods]
impl PyTableScan {
#[pyo3(name = "getTableScanProjects")]
fn scan_projects(&mut self) -> PyResult<Vec<String>> {
match &self.table_scan.projection {
Some(indices) => {
let schema = self.table_scan.source.schema();
Ok(indices
.iter()
.map(|i| schema.field(*i).name().to_string())
.collect())
}
None => Ok(vec![]),
}
}

/// If the 'TableScan' contains columns that should be projected during the
/// read return True, otherwise return False
#[pyo3(name = "containsProjections")]
fn contains_projections(&self) -> bool {
self.table_scan.projection.is_some()
}
}

impl TryFrom<logical::LogicalPlan> for PyTableScan {
type Error = PyErr;

fn try_from(logical_plan: logical::LogicalPlan) -> Result<Self, Self::Error> {
match logical_plan {
logical::LogicalPlan::TableScan(table_scan) => Ok(PyTableScan { table_scan }),
_ => Err(py_type_err("unexpected plan")),
}
}
}
3 changes: 1 addition & 2 deletions dask_planner/src/sql/types/rel_data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ impl RelDataType {
self.field_list.clone()
}

/// Returns the names of the fields in a struct type. The field count
/// is equal to the size of the returned list.
/// Returns the names of all of the columns in a given DaskTable
#[pyo3(name = "getFieldNames")]
pub fn field_names(&self) -> Vec<String> {
assert!(!self.field_list.is_empty());
Expand Down
15 changes: 11 additions & 4 deletions dask_sql/physical/rel/logical/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def convert(
# There should not be any input. This is the first step.
self.assert_inputs(rel, 0)

# Rust table_scan instance handle
table_scan = rel.table_scan()

# The table(s) we need to return
table = rel.getTable()

Expand All @@ -48,11 +51,15 @@ def convert(
df = dc.df
cc = dc.column_container

# Make sure we only return the requested columns
row_type = table.getRowType()
field_specifications = [str(f) for f in row_type.getFieldNames()]
cc = cc.limit_to(field_specifications)
# If the 'TableScan' instance contains projected columns only retrieve those columns
# otherwise get all projected columns from the 'Projection' instance, which is contained
# in the 'RelDataType' instance, aka 'row_type'
if table_scan.containsProjections():
field_specifications = table_scan.getTableScanProjects()
else:
field_specifications = [str(f) for f in table.getRowType().getFieldNames()]

cc = cc.limit_to(field_specifications)
cc = self.fix_column_to_row_type(cc, rel.getRowType())
dc = DataContainer(df, cc)
dc = self.fix_dtype_to_row_type(dc, rel.getRowType())
Expand Down
29 changes: 29 additions & 0 deletions tests/integration/test_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,32 @@ def test_case_when_no_else(c):
expected_df = pd.DataFrame({"C": [None, 1, 1, 1, None]})

assert_eq(actual_df, expected_df)


def test_singular_column_projection_simple(c):
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
c.create_table("df", df)

wildcard_result = c.sql("SELECT * from df")
single_col_result = c.sql("SELECT b from df")

assert_eq(wildcard_result["b"], single_col_result["b"])


@pytest.mark.parametrize(
"input_cols",
[
["a"],
["a", "b"],
["a", "d"],
["d", "a"],
["a", "b", "d"],
],
)
def test_multiple_column_projection(c, input_cols):
projection_list = ", ".join(input_cols)
result = c.sql(f"SELECT {projection_list} from parquet_ddf")

# There are 5 columns in the table, ensure only specified ones are read
assert_eq(len(result.columns), len(input_cols))
assert all(x in input_cols for x in result.columns)

0 comments on commit 230d726

Please sign in to comment.