Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datafusion invalid projection #571

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
b1900cf
Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral
jdye64 Mar 26, 2022
1e48597
Updates for test_filter
jdye64 Mar 31, 2022
fd41a8c
more of test_filter.py working with the exception of some date pytests
jdye64 Mar 31, 2022
682c009
Add workflow to keep datafusion dev branch up to date (#440)
charlesbluca Mar 25, 2022
ab69dd8
Include setuptools-rust in conda build recipie, in host and run
jdye64 Apr 13, 2022
ce4c31e
Remove PyArrow dependency
jdye64 Apr 20, 2022
8785b8c
rebase with datafusion-sql-planner
jdye64 Apr 21, 2022
3e45ab8
refactor changes that were inadvertent during rebase
jdye64 Apr 21, 2022
1734b89
timestamp with loglca time zone
jdye64 Apr 21, 2022
ac7d9f6
Bump DataFusion version (#494)
andygrove Apr 21, 2022
cbf5db0
Include RelDataType work
jdye64 Apr 21, 2022
d9380a6
Include RelDataType work
jdye64 Apr 21, 2022
ad56fc2
Introduced SqlTypeName Enum in Rust and mappings for Python
jdye64 Apr 22, 2022
7b20e66
impl PyExpr.getIndex()
jdye64 Apr 22, 2022
7dd2017
add getRowType() for logical.rs
jdye64 Apr 22, 2022
984f523
Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes
jdye64 Apr 23, 2022
1405fea
use str values instead of Rust Enums, Python is unable to Hash the Ru…
jdye64 Apr 23, 2022
789aaad
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
652205e
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
5127f87
Convert final strs to SqlTypeName Enum
jdye64 Apr 24, 2022
cf568dc
removed a few print statements
jdye64 Apr 24, 2022
4fb640e
commit to share with colleague
jdye64 Apr 24, 2022
32127e5
updates
jdye64 Apr 25, 2022
f5e24fe
checkpoint
jdye64 Apr 25, 2022
11cf212
Temporarily disable conda run_test.py script since it uses features n…
jdye64 Apr 25, 2022
46dfb0a
formatting after upstream merge
jdye64 Apr 25, 2022
fa71674
expose fromString method for SqlTypeName to use Enums instead of stri…
jdye64 Apr 25, 2022
f6e86ca
expanded SqlTypeName from_string() support
jdye64 Apr 25, 2022
3d1a5ad
accept INT as INTEGER
jdye64 Apr 25, 2022
384e446
tests update
jdye64 Apr 25, 2022
199b9d2
checkpoint
jdye64 Apr 25, 2022
c9dffae
checkpoint
jdye64 Apr 27, 2022
c9aad43
Refactor PyExpr by removing From trait, and using recursion to expand…
jdye64 Apr 28, 2022
11100fa
skip test that uses create statement for gpuci
jdye64 Apr 28, 2022
643e85d
Basic DataFusion Select Functionality (#489)
jdye64 Apr 28, 2022
b36ef16
updates for expression
jdye64 Apr 28, 2022
5c94fbc
uncommented pytests
jdye64 Apr 28, 2022
bb461c8
uncommented pytests
jdye64 Apr 28, 2022
f65b1ab
code cleanup for review
jdye64 Apr 28, 2022
dc7553f
code cleanup for review
jdye64 Apr 28, 2022
f1dc0b2
Enabled more pytest that work now
jdye64 Apr 28, 2022
940e867
Enabled more pytest that work now
jdye64 Apr 28, 2022
6769ca0
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
c4ed9bd
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
05c5788
Disable 2 pytest that are causing gpuCI issues. They will be address …
jdye64 Apr 29, 2022
a33aa63
Handle Between operation for case-when
jdye64 Apr 29, 2022
20efd5c
adjust timestamp casting
jdye64 May 2, 2022
281baf7
merge with upstream
jdye64 May 6, 2022
d666bdd
merge with upstream/datafusion-sql-planner
jdye64 May 9, 2022
533f50a
Refactor projection _column_name() logic to the _column_name logic in…
jdye64 May 9, 2022
a42a133
removed println! statements
jdye64 May 9, 2022
dc12f5d
introduce join getCondition() logic for retrieving the combining Rex …
jdye64 May 10, 2022
9dce68a
merge with upstream
jdye64 May 10, 2022
10cd463
merge with upstream
jdye64 May 10, 2022
a1841c3
Updates from review
jdye64 May 11, 2022
3001943
Add Offset and point to repo with offset in datafusion
jdye64 May 11, 2022
7ec66da
Introduce offset
jdye64 May 12, 2022
b72917b
limit updates
jdye64 May 12, 2022
651c9ab
commit before upstream merge
jdye64 May 15, 2022
4e69813
merged with upstream/datafusion-sql-planner
jdye64 May 16, 2022
3219ad0
Code formatting
jdye64 May 16, 2022
5a88155
Merge with upstream
jdye64 May 16, 2022
23adefa
Merge with upstream
jdye64 May 16, 2022
bd94ccf
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 May 17, 2022
bf91e8f
update Cargo.toml to use Arrow-DataFusion version with LIMIT logic
jdye64 May 17, 2022
3dc6a89
Bump DataFusion version to get changes around variant_name()
jdye64 May 18, 2022
08b38aa
Use map partitions for determining the offset
jdye64 May 19, 2022
7b52f41
Merge with upstream datafusion-crossjoin merge
jdye64 May 19, 2022
6638930
Added multiple LogicalPlan inputs for join conditions
jdye64 May 20, 2022
e24b97f
Merge with upstream LIMIT PR
jdye64 May 20, 2022
61bd864
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 May 22, 2022
e3b0d2b
Merge with upstream
jdye64 May 23, 2022
0407c6f
Rename underlying DataContainer's DataFrame instance to match the col…
jdye64 May 23, 2022
af1c138
Adjust ColumnContainer mapping after join.py logic to entire the bake…
jdye64 May 23, 2022
8853765
Add enumerate to column_{i} generation string to ensure columns exist…
jdye64 May 24, 2022
2adc5ce
Adjust join schema logic to perform merge instead of join on rust sid…
jdye64 May 24, 2022
6005018
Handle DataFusion COUNT(UInt8(1)) as COUNT(*)
jdye64 May 24, 2022
f640e1d
commit before merge
jdye64 May 24, 2022
f0cc07b
merge with upstream datafusion-sql-planner
jdye64 May 24, 2022
3159645
Update function for gathering index of a expression
jdye64 May 24, 2022
ba8cec2
Update for review check
jdye64 May 25, 2022
a8fba46
Adjust RelDataType to retrieve fully qualified column names
jdye64 May 26, 2022
8a1a865
Adjust base.py to get fully qualified column name
jdye64 May 26, 2022
6e966b6
Enable passing pytests in test_join.py
jdye64 May 26, 2022
b9604cc
Adjust keys provided by getting backend column mapping name
jdye64 May 27, 2022
014fe68
Adjust output_col to not use the backend_column name for special rese…
jdye64 May 27, 2022
5b0dba3
uncomment cross join pytest which works now
jdye64 May 27, 2022
d17d859
Uncomment passing pytests in test_select.py
jdye64 May 27, 2022
805ec8a
Review updates
jdye64 May 28, 2022
7728bd4
Add back complex join case condition, not just cross join but 'comple…
jdye64 May 28, 2022
6f8d0d9
Enable DataFusion CBO logic
jdye64 May 31, 2022
dad9eb4
Disable EliminateFilter optimization rule
jdye64 May 31, 2022
adc0083
updates
jdye64 Jun 1, 2022
4101d27
upstream merge
jdye64 Jun 1, 2022
be7d502
Disable tests that hit CBO generated plan edge cases of yet to be imp…
jdye64 Jun 1, 2022
a006def
[REVIEW] - Modifiy sql.skip_optimize to use dask_config.get and remov…
jdye64 Jun 2, 2022
6ba6edb
[REVIEW] - change name of configuration from skip_optimize to optimize
jdye64 Jun 2, 2022
984c5bb
[REVIEW] - Add OptimizeException catch and raise statements back
jdye64 Jun 2, 2022
e59cd1e
Found issue where backend column names which are results of a single …
jdye64 Jun 3, 2022
4edb4b5
Remove SQL from OptimizationException
jdye64 Jun 3, 2022
06e76ed
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 Jun 6, 2022
15115ab
Upstream merge and removed unused code imports
jdye64 Jun 7, 2022
da37517
skip tests that CBO plan reorganization causes missing features to be…
jdye64 Jun 7, 2022
a3633b2
If TableScan contains projections use those instead of all of the Tab…
jdye64 Jun 7, 2022
9d7166b
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 Jun 7, 2022
c91df1e
[REVIEW] remove compute(), remove temp row_type variable
jdye64 Jun 7, 2022
4bb73a1
[REVIEW] - Add test for projection pushdown
jdye64 Jun 7, 2022
7494f87
[REVIEW] - Add some more parametrized test combinations
jdye64 Jun 7, 2022
3d967a4
[REVIEW] - Use iterator instead of for loop and simplify contains_pro…
jdye64 Jun 8, 2022
4bbb9ed
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 Jun 8, 2022
18f490a
[REVIEW] - merge upstream and adjust imports
jdye64 Jun 8, 2022
d828f19
[REVIEW] - Rename pytest function and remove duplicate table creation
jdye64 Jun 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ mod filter;
mod join;
mod limit;
mod offset;
pub mod projection;
mod projection;
mod sort;
mod table_scan;
mod union;

use datafusion_common::{Column, DFSchemaRef, DataFusionError, Result};
Expand Down Expand Up @@ -111,6 +112,11 @@ impl PyLogicalPlan {
to_py_plan(self.current_node.as_ref())
}

/// LogicalPlan::TableScan as PyTableScan
pub fn table_scan(&self) -> PyResult<table_scan::PyTableScan> {
to_py_plan(self.current_node.as_ref())
}

/// Gets the "input" for the current LogicalPlan
pub fn get_inputs(&mut self) -> PyResult<Vec<PyLogicalPlan>> {
let mut py_inputs: Vec<PyLogicalPlan> = Vec::new();
Expand Down
45 changes: 45 additions & 0 deletions dask_planner/src/sql/logical/table_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::sql::exceptions::py_type_err;
use crate::sql::logical;
use datafusion_expr::logical_plan::TableScan;
use pyo3::prelude::*;

#[pyclass(name = "TableScan", module = "dask_planner", subclass)]
#[derive(Clone)]
pub struct PyTableScan {
pub(crate) table_scan: TableScan,
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
}

#[pymethods]
impl PyTableScan {
#[pyo3(name = "getTableScanProjects")]
fn scan_projects(&mut self) -> PyResult<Vec<String>> {
match &self.table_scan.projection {
Some(indices) => {
let schema = self.table_scan.source.schema();
Ok(indices
.iter()
.map(|i| schema.field(*i).name().to_string())
.collect())
}
None => Ok(vec![]),
}
}

/// If the 'TableScan' contains columns that should be projected during the
/// read return True, otherwise return False
#[pyo3(name = "containsProjections")]
fn contains_projections(&self) -> bool {
self.table_scan.projection.is_some()
}
}

impl TryFrom<logical::LogicalPlan> for PyTableScan {
type Error = PyErr;

fn try_from(logical_plan: logical::LogicalPlan) -> Result<Self, Self::Error> {
match logical_plan {
logical::LogicalPlan::TableScan(table_scan) => Ok(PyTableScan { table_scan }),
_ => Err(py_type_err("unexpected plan")),
}
}
}
3 changes: 1 addition & 2 deletions dask_planner/src/sql/types/rel_data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ impl RelDataType {
self.field_list.clone()
}

/// Returns the names of the fields in a struct type. The field count
/// is equal to the size of the returned list.
/// Returns the names of all of the columns in a given DaskTable
#[pyo3(name = "getFieldNames")]
pub fn field_names(&self) -> Vec<String> {
assert!(!self.field_list.is_empty());
Expand Down
15 changes: 11 additions & 4 deletions dask_sql/physical/rel/logical/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def convert(
# There should not be any input. This is the first step.
self.assert_inputs(rel, 0)

# Rust table_scan instance handle
table_scan = rel.table_scan()

# The table(s) we need to return
table = rel.getTable()

Expand All @@ -48,11 +51,15 @@ def convert(
df = dc.df
cc = dc.column_container

# Make sure we only return the requested columns
row_type = table.getRowType()
field_specifications = [str(f) for f in row_type.getFieldNames()]
cc = cc.limit_to(field_specifications)
# If the 'TableScan' instance contains projected columns only retrieve those columns
# otherwise get all projected columns from the 'Projection' instance, which is contained
# in the 'RelDataType' instance, aka 'row_type'
if table_scan.containsProjections():
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
field_specifications = table_scan.getTableScanProjects()
else:
field_specifications = [str(f) for f in table.getRowType().getFieldNames()]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
field_specifications = [str(f) for f in table.getRowType().getFieldNames()]
field_specifications = table.getRowType().getFieldNames()

Do we have to loop over and cast field names with str?


cc = cc.limit_to(field_specifications)
cc = self.fix_column_to_row_type(cc, rel.getRowType())
dc = DataContainer(df, cc)
dc = self.fix_dtype_to_row_type(dc, rel.getRowType())
Expand Down
29 changes: 29 additions & 0 deletions tests/integration/test_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,32 @@ def test_case_when_no_else(c):
expected_df = pd.DataFrame({"C": [None, 1, 1, 1, None]})

assert_eq(actual_df, expected_df)


def test_singular_column_projection_simple(c):
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
c.create_table("df", df)

wildcard_result = c.sql("SELECT * from df")
single_col_result = c.sql("SELECT b from df")

assert_eq(wildcard_result["b"], single_col_result["b"])


@pytest.mark.parametrize(
"input_cols",
[
["a"],
["a", "b"],
["a", "d"],
["d", "a"],
["a", "b", "d"],
],
)
def test_multiple_column_projection(c, input_cols):
projection_list = ", ".join(input_cols)
result = c.sql(f"SELECT {projection_list} from parquet_ddf")

# There are 5 columns in the table, ensure only specified ones are read
assert_eq(len(result.columns), len(input_cols))
assert all(x in input_cols for x in result.columns)