Skip to content

Commit

Permalink
Support for LIMIT clause with DataFusion (#529)
Browse files Browse the repository at this point in the history
* Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral

* Updates for test_filter

* more of test_filter.py working with the exception of some date pytests

* Add workflow to keep datafusion dev branch up to date (#440)

* Include setuptools-rust in conda build recipie, in host and run

* Remove PyArrow dependency

* rebase with datafusion-sql-planner

* refactor changes that were inadvertent during rebase

* timestamp with loglca time zone

* Bump DataFusion version (#494)

* bump DataFusion version

* remove unnecessary downcasts and use separate structs for TableSource and TableProvider

* Include RelDataType work

* Include RelDataType work

* Introduced SqlTypeName Enum in Rust and mappings for Python

* impl PyExpr.getIndex()

* add getRowType() for logical.rs

* Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes

* use str values instead of Rust Enums, Python is unable to Hash the Rust Enums if used in a dict

* linter changes, why did that work on my local pre-commit??

* linter changes, why did that work on my local pre-commit??

* Convert final strs to SqlTypeName Enum

* removed a few print statements

* commit to share with colleague

* updates

* checkpoint

* Temporarily disable conda run_test.py script since it uses features not yet implemented

* formatting after upstream merge

* expose fromString method for SqlTypeName to use Enums instead of strings for type checking

* expanded SqlTypeName from_string() support

* accept INT as INTEGER

* tests update

* checkpoint

* checkpoint

* Refactor PyExpr by removing From trait, and using recursion to expand expression list for rex calls

* skip test that uses create statement for gpuci

* Basic DataFusion Select Functionality (#489)

* Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral

* Updates for test_filter

* more of test_filter.py working with the exception of some date pytests

* Add workflow to keep datafusion dev branch up to date (#440)

* Include setuptools-rust in conda build recipie, in host and run

* Remove PyArrow dependency

* rebase with datafusion-sql-planner

* refactor changes that were inadvertent during rebase

* timestamp with loglca time zone

* Include RelDataType work

* Include RelDataType work

* Introduced SqlTypeName Enum in Rust and mappings for Python

* impl PyExpr.getIndex()

* add getRowType() for logical.rs

* Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes

* use str values instead of Rust Enums, Python is unable to Hash the Rust Enums if used in a dict

* linter changes, why did that work on my local pre-commit??

* linter changes, why did that work on my local pre-commit??

* Convert final strs to SqlTypeName Enum

* removed a few print statements

* Temporarily disable conda run_test.py script since it uses features not yet implemented

* expose fromString method for SqlTypeName to use Enums instead of strings for type checking

* expanded SqlTypeName from_string() support

* accept INT as INTEGER

* Remove print statements

* Default to UTC if tz is None

* Delegate timezone handling to the arrow library

* Updates from review

Co-authored-by: Charles Blackmon-Luca <[email protected]>

* updates for expression

* uncommented pytests

* uncommented pytests

* code cleanup for review

* code cleanup for review

* Enabled more pytest that work now

* Enabled more pytest that work now

* Output Expression as String when BinaryExpr does not contain a named alias

* Output Expression as String when BinaryExpr does not contain a named alias

* Disable 2 pytest that are causing gpuCI issues. They will be address in a follow up PR

* Handle Between operation for case-when

* adjust timestamp casting

* Refactor projection _column_name() logic to the _column_name logic in expression.rs

* removed println! statements

* Updates from review

* Add Offset and point to repo with offset in datafusion

* Introduce offset

* limit updates

* commit before upstream merge

* Code formatting

* update Cargo.toml to use Arrow-DataFusion version with LIMIT logic

* Bump DataFusion version to get changes around variant_name()

* Use map partitions for determining the offset

* Refactor offset partition func

* Update to use TryFrom logic

* Add cloudpickle to independent scheduler requirements

Co-authored-by: Charles Blackmon-Luca <[email protected]>
Co-authored-by: Andy Grove <[email protected]>
  • Loading branch information
3 people authored May 24, 2022
1 parent f7c57b3 commit e10b3f2
Show file tree
Hide file tree
Showing 17 changed files with 187 additions and 101 deletions.
8 changes: 6 additions & 2 deletions .github/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ version: '3'
services:
dask-scheduler:
container_name: dask-scheduler
image: daskdev/dask:latest
image: daskdev/dask:dev
command: dask-scheduler
environment:
USE_MAMBA: "true"
EXTRA_CONDA_PACKAGES: "cloudpickle>=1.5.0" # match client cloudpickle version
ports:
- "8786:8786"
dask-worker:
container_name: dask-worker
image: daskdev/dask:latest
image: daskdev/dask:dev
command: dask-worker dask-scheduler:8786
environment:
USE_MAMBA: "true"
EXTRA_CONDA_PACKAGES: "pyarrow>=4.0.0" # required for parquet IO
volumes:
- /tmp:/tmp
2 changes: 1 addition & 1 deletion dask_planner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ rust-version = "1.59"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
rand = "0.7"
pyo3 = { version = "0.16", features = ["extension-module", "abi3", "abi3-py38"] }
datafusion = { git="https://github.com/apache/arrow-datafusion/", rev = "8.0.0" }
datafusion = { git="https://github.com/apache/arrow-datafusion/", rev = "78207f5092fc5204ecd791278d403dcb6f0ae683" }
uuid = { version = "0.8", features = ["v4"] }
mimalloc = { version = "*", default-features = false }
parking_lot = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use datafusion::logical_expr::{lit, BuiltinScalarFunction, Expr};

use datafusion::scalar::ScalarValue;

pub use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::LogicalPlan;

use datafusion::prelude::Column;

Expand Down
3 changes: 2 additions & 1 deletion dask_planner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::sql::exceptions::ParsingException;

use datafusion::arrow::datatypes::{Field, Schema};
use datafusion::catalog::{ResolvedTableReference, TableReference};
use datafusion::datasource::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::ScalarFunctionImplementation;
use datafusion::physical_plan::udaf::AggregateUDF;
Expand Down Expand Up @@ -55,7 +56,7 @@ impl ContextProvider for DaskSQLContext {
fn get_table_provider(
&self,
name: TableReference,
) -> Result<Arc<dyn table::TableProvider>, DataFusionError> {
) -> Result<Arc<dyn TableProvider>, DataFusionError> {
let reference: ResolvedTableReference =
name.resolve(&self.default_catalog_name, &self.default_schema_name);
match self.schemas.get(&self.default_schema_name) {
Expand Down
17 changes: 15 additions & 2 deletions dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ mod cross_join;
mod explain;
mod filter;
mod join;
mod limit;
mod offset;
pub mod projection;
mod sort;

pub use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::LogicalPlan;

use datafusion::common::Result;
use datafusion::prelude::Column;
Expand Down Expand Up @@ -85,6 +87,16 @@ impl PyLogicalPlan {
to_py_plan(self.current_node.as_ref())
}

/// LogicalPlan::Limit as PyLimit
pub fn limit(&self) -> PyResult<limit::PyLimit> {
to_py_plan(self.current_node.as_ref())
}

/// LogicalPlan::Offset as PyOffset
pub fn offset(&self) -> PyResult<offset::PyOffset> {
to_py_plan(self.current_node.as_ref())
}

/// LogicalPlan::Projection as PyProjection
pub fn projection(&self) -> PyResult<projection::PyProjection> {
to_py_plan(self.current_node.as_ref())
Expand Down Expand Up @@ -140,6 +152,7 @@ impl PyLogicalPlan {
LogicalPlan::TableScan(_table_scan) => "TableScan",
LogicalPlan::EmptyRelation(_empty_relation) => "EmptyRelation",
LogicalPlan::Limit(_limit) => "Limit",
LogicalPlan::Offset(_offset) => "Offset",
LogicalPlan::CreateExternalTable(_create_external_table) => "CreateExternalTable",
LogicalPlan::CreateMemoryTable(_create_memory_table) => "CreateMemoryTable",
LogicalPlan::DropTable(_drop_table) => "DropTable",
Expand All @@ -151,7 +164,7 @@ impl PyLogicalPlan {
LogicalPlan::SubqueryAlias(_sqalias) => "SubqueryAlias",
LogicalPlan::CreateCatalogSchema(_create) => "CreateCatalogSchema",
LogicalPlan::CreateCatalog(_create_catalog) => "CreateCatalog",
LogicalPlan::CreateView(_) => "CreateView",
LogicalPlan::CreateView(_create_view) => "CreateView",
})
}

Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/sql/logical/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::expression::PyExpr;

use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::{logical_plan::Aggregate, Expr};
pub use datafusion::logical_expr::{logical_plan::JoinType, LogicalPlan};

use crate::sql::exceptions::py_type_err;
use pyo3::prelude::*;
Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/sql/logical/filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::expression::PyExpr;

use datafusion::logical_expr::logical_plan::Filter;
pub use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::LogicalPlan;

use crate::sql::exceptions::py_type_err;
use pyo3::prelude::*;
Expand Down
35 changes: 35 additions & 0 deletions dask_planner/src/sql/logical/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::expression::PyExpr;
use crate::sql::exceptions::py_type_err;

use datafusion::scalar::ScalarValue;
use pyo3::prelude::*;

use datafusion::logical_expr::{logical_plan::Limit, Expr, LogicalPlan};

#[pyclass(name = "Limit", module = "dask_planner", subclass)]
#[derive(Clone)]
pub struct PyLimit {
limit: Limit,
}

#[pymethods]
impl PyLimit {
#[pyo3(name = "getLimitN")]
pub fn limit_n(&self) -> PyResult<PyExpr> {
Ok(PyExpr::from(
Expr::Literal(ScalarValue::UInt64(Some(self.limit.n.try_into().unwrap()))),
Some(self.limit.input.clone()),
))
}
}

impl TryFrom<LogicalPlan> for PyLimit {
type Error = PyErr;

fn try_from(logical_plan: LogicalPlan) -> Result<Self, Self::Error> {
match logical_plan {
LogicalPlan::Limit(limit) => Ok(PyLimit { limit: limit }),
_ => Err(py_type_err("unexpected plan")),
}
}
}
44 changes: 44 additions & 0 deletions dask_planner/src/sql/logical/offset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use crate::expression::PyExpr;
use crate::sql::exceptions::py_type_err;

use datafusion::scalar::ScalarValue;
use pyo3::prelude::*;

use datafusion::logical_expr::{logical_plan::Offset, Expr, LogicalPlan};

#[pyclass(name = "Offset", module = "dask_planner", subclass)]
#[derive(Clone)]
pub struct PyOffset {
offset: Offset,
}

#[pymethods]
impl PyOffset {
#[pyo3(name = "getOffset")]
pub fn offset(&self) -> PyResult<PyExpr> {
Ok(PyExpr::from(
Expr::Literal(ScalarValue::UInt64(Some(self.offset.offset as u64))),
Some(self.offset.input.clone()),
))
}

#[pyo3(name = "getFetch")]
pub fn offset_fetch(&self) -> PyResult<PyExpr> {
// TODO: Still need to implement fetch size! For now get everything from offset on with '0'
Ok(PyExpr::from(
Expr::Literal(ScalarValue::UInt64(Some(0))),
Some(self.offset.input.clone()),
))
}
}

impl TryFrom<LogicalPlan> for PyOffset {
type Error = PyErr;

fn try_from(logical_plan: LogicalPlan) -> Result<Self, Self::Error> {
match logical_plan {
LogicalPlan::Offset(offset) => Ok(PyOffset { offset: offset }),
_ => Err(py_type_err("unexpected plan")),
}
}
}
2 changes: 1 addition & 1 deletion dask_planner/src/sql/logical/projection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::expression::PyExpr;

pub use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::{logical_plan::Projection, Expr};

use crate::sql::exceptions::py_type_err;
Expand Down
3 changes: 1 addition & 2 deletions dask_planner/src/sql/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ use crate::sql::types::SqlTypeName;
use async_trait::async_trait;

use datafusion::arrow::datatypes::{DataType, Field, SchemaRef};
pub use datafusion::datasource::TableProvider;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{Expr, LogicalPlan, TableSource};
use datafusion::physical_plan::{empty::EmptyExec, project_schema, ExecutionPlan};

use pyo3::prelude::*;

use datafusion::datasource::TableType;
use std::any::Any;
use std::sync::Arc;

Expand Down
1 change: 1 addition & 0 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __init__(self, logging_level=logging.INFO):
RelConverter.add_plugin_class(logical.DaskJoinPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskCrossJoinPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskLimitPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskOffsetPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskProjectPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskSortPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskTableScanPlugin, replace=False)
Expand Down
2 changes: 2 additions & 0 deletions dask_sql/physical/rel/logical/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .filter import DaskFilterPlugin
from .join import DaskJoinPlugin
from .limit import DaskLimitPlugin
from .offset import DaskOffsetPlugin
from .project import DaskProjectPlugin
from .sample import SamplePlugin
from .sort import DaskSortPlugin
Expand All @@ -18,6 +19,7 @@
DaskJoinPlugin,
DaskCrossJoinPlugin,
DaskLimitPlugin,
DaskOffsetPlugin,
DaskProjectPlugin,
DaskSortPlugin,
DaskTableScanPlugin,
Expand Down
81 changes: 4 additions & 77 deletions dask_sql/physical/rel/logical/limit.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from typing import TYPE_CHECKING

import dask.dataframe as dd

from dask_sql.datacontainer import DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.physical.rex import RexConverter
from dask_sql.physical.utils.map import map_on_partition_index

if TYPE_CHECKING:
import dask_sql
Expand All @@ -25,82 +22,12 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
df = dc.df
cc = dc.column_container

offset = rel.getOffset()
if offset:
offset = RexConverter.convert(offset, df, context=context)

end = rel.getFetch()
if end:
end = RexConverter.convert(end, df, context=context)

if offset:
end += offset
limit = RexConverter.convert(rel, rel.limit().getLimitN(), df, context=context)

df = self._apply_offset(df, offset, end)
# If an offset was present it would have already been processed at this point.
# Therefore it is always safe to start at 0 when applying the limit
df = df.head(limit, npartitions=-1, compute=False)

cc = self.fix_column_to_row_type(cc, rel.getRowType())
# No column type has changed, so no need to cast again
return DataContainer(df, cc)

def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame:
"""
Limit the dataframe to the window [offset, end].
That is unfortunately, not so simple as we do not know how many
items we have in each partition. We have therefore no other way than to
calculate (!!!) the sizes of each partition.
After that, we can create a new dataframe from the old
dataframe by calculating for each partition if and how much
it should be used.
We do this via generating our own dask computation graph as
we need to pass the partition number to the selection
function, which is not possible with normal "map_partitions".
"""
if not offset:
# We do a (hopefully) very quick check: if the first partition
# is already enough, we will just use this
first_partition_length = len(df.partitions[0])
if first_partition_length >= end:
return df.head(end, compute=False)

# First, we need to find out which partitions we want to use.
# Therefore we count the total number of entries
partition_borders = df.map_partitions(lambda x: len(x))

# Now we let each of the partitions figure out, how much it needs to return
# using these partition borders
# For this, we generate out own dask computation graph (as it does not really
# fit well with one of the already present methods).

# (a) we define a method to be calculated on each partition
# This method returns the part of the partition, which falls between [offset, fetch]
# Please note that the dask object "partition_borders", will be turned into
# its pandas representation at this point and we can calculate the cumsum
# (which is not possible on the dask object). Recalculating it should not cost
# us much, as we assume the number of partitions is rather small.
def select_from_to(df, partition_index, partition_borders):
partition_borders = partition_borders.cumsum().to_dict()
this_partition_border_left = (
partition_borders[partition_index - 1] if partition_index > 0 else 0
)
this_partition_border_right = partition_borders[partition_index]

if (end and end < this_partition_border_left) or (
offset and offset >= this_partition_border_right
):
return df.iloc[0:0]

from_index = max(offset - this_partition_border_left, 0) if offset else 0
to_index = (
min(end, this_partition_border_right)
if end
else this_partition_border_right
) - this_partition_border_left

return df.iloc[from_index:to_index]

# (b) Now we just need to apply the function on every partition
# We do this via the delayed interface, which seems the easiest one.
return map_on_partition_index(
df, select_from_to, partition_borders, meta=df._meta
)
Loading

0 comments on commit e10b3f2

Please sign in to comment.