Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge main into datafusion-sql-planner #654

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
23603de
Add basic predicate-pushdown optimization (#433)
rjzamora Mar 25, 2022
09c7bdf
Add workflow to keep datafusion dev branch up to date (#440)
charlesbluca Mar 25, 2022
1b0b6f7
Update gpuCI `RAPIDS_VER` to `22.06` (#434)
github-actions[bot] Apr 1, 2022
a05138d
Bump black to 22.3.0 (#443)
charlesbluca Apr 4, 2022
ab2aa5a
Check for ucx-py nightlies when updating gpuCI (#441)
charlesbluca Apr 5, 2022
a28f757
Add handling for newer `prompt_toolkit` versions in cmd tests (#447)
charlesbluca Apr 6, 2022
486fc66
Fix version for gha-find-replace (#446)
charlesbluca Apr 6, 2022
ce176e0
Update versions of Java dependencies (#445)
ayushdg Apr 7, 2022
50d95d2
Update jackson databind version (#449)
ayushdg Apr 7, 2022
37a3a61
Disable SQL server functionality (#448)
charlesbluca Apr 7, 2022
ffdc42f
Update dask pinnings for release (#450)
charlesbluca Apr 7, 2022
fa74aef
Add Java source code to source distribution (#451)
charlesbluca Apr 7, 2022
37ea6b6
Bump `httpclient` dependency (#453)
charlesbluca Apr 8, 2022
f19ee4d
Unpin Dask/distributed versions (#452)
charlesbluca Apr 11, 2022
1eb30c1
Add jsonschema to ci testing (#454)
ayushdg Apr 11, 2022
2bd1d18
Switch tests from `pd.testing.assert_frame_equal` to `dd.assert_eq` (…
charlesbluca Apr 11, 2022
95b0dd0
Set max pin on antlr4-python-runtime (#456)
ayushdg Apr 12, 2022
031c04c
Move / minimize number of cudf / dask-cudf imports (#480)
charlesbluca Apr 19, 2022
48eb983
Use `map_partitions` to compute LIMIT / OFFSET (#517)
charlesbluca May 13, 2022
7b4bc55
Use `dev` images for independent cluster testing (#518)
charlesbluca May 16, 2022
b58989f
Add documentation for FugueSQL integrations (#523)
charlesbluca May 16, 2022
cb3d903
Timestampdiff support (#495)
ayushdg May 17, 2022
8ec3ed5
Relax jsonschema testing dependency (#546)
charlesbluca May 20, 2022
ff4a8a5
Update upstream testing workflows (#536)
charlesbluca May 23, 2022
cb55c07
Fix pyarrow / cloudpickle failures in cluster testing (#553)
charlesbluca May 24, 2022
d8302e9
Use bash -l as default entrypoint for all jobs (#552)
charlesbluca May 24, 2022
0d0394a
Constrain dask/distributed for release (#563)
charlesbluca Jun 3, 2022
1e881ee
Unpin dask/distributed for development (#564)
charlesbluca Jun 3, 2022
243c809
update dask-sphinx-theme (#567)
scharlottej13 Jun 6, 2022
ec3d5da
Make sure scheduler has Dask nightlies in upstream cluster testing (#…
charlesbluca Jun 7, 2022
c19315a
Update gpuCI `RAPIDS_VER` to `22.08` (#565)
github-actions[bot] Jun 7, 2022
bc1cadc
Modify test environment pinnings to cover minimum versions (#555)
charlesbluca Jun 15, 2022
0db4506
Don't move jar to local mvn repo (#579)
ksonj Jun 15, 2022
ddc26ee
Add max version constraint for `fugue` (#639)
charlesbluca Jul 22, 2022
8a73309
Add environment file & documentation for GPU tests (#633)
charlesbluca Jul 25, 2022
20daf89
Validate UDF metadata (#641)
brandon-b-miller Jul 26, 2022
712a2af
Set Dask-sql as the default Fugue Dask engine when installed (#640)
Jul 26, 2022
15ed09e
Merge remote-tracking branch 'upstream/main' into merge-upstream-main
charlesbluca Aug 1, 2022
a9be03a
Add Rust setup to upstream testing workflow
charlesbluca Aug 1, 2022
138e70e
Resolve style failures
charlesbluca Aug 1, 2022
a0c6344
Bump fugue version in CI envs
charlesbluca Aug 1, 2022
73f5228
Add back scalar case for cast operation
charlesbluca Aug 1, 2022
71403fe
Resolve UDF failures
charlesbluca Aug 1, 2022
c55de4c
Resolve UDF failures for windows
charlesbluca Aug 1, 2022
75a2449
Remove calcite-specific reinterpret
charlesbluca Aug 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ services:
container_name: dask-scheduler
image: daskdev/dask:dev-py3.9
command: dask-scheduler
environment:
USE_MAMBA: "true"
EXTRA_CONDA_PACKAGES: "cloudpickle>=1.5.0" # match client cloudpickle version
ports:
- "8786:8786"
dask-worker:
Expand Down
46 changes: 27 additions & 19 deletions .github/workflows/test-upstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
test-dev:
name: "Test upstream dev (${{ matrix.os }}, python: ${{ matrix.python }})"
runs-on: ${{ matrix.os }}
if: github.repository == 'dask-contrib/dask-sql'
env:
CONDA_FILE: continuous_integration/environment-${{ matrix.python }}-dev.yaml
defaults:
Expand All @@ -37,6 +38,15 @@ jobs:
channels: dask/label/dev,conda-forge,nodefaults
activate-environment: dask-sql
environment-file: ${{ env.CONDA_FILE }}
- name: Setup Rust Toolchain
uses: actions-rs/toolchain@v1
id: rust-toolchain
with:
toolchain: stable
override: true
- name: Build the Rust DataFusion bindings
run: |
python setup.py build install
- name: Install hive testing dependencies for Linux
if: matrix.os == 'ubuntu-latest'
run: |
Expand All @@ -57,11 +67,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Cache local Maven repository
uses: actions/cache@v2
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-v1-jdk11-${{ hashFiles('**/pom.xml') }}
- name: Set up Python
uses: conda-incubator/setup-miniconda@v2
with:
Expand All @@ -71,12 +76,16 @@ jobs:
channel-priority: strict
channels: dask/label/dev,conda-forge,nodefaults
activate-environment: dask-sql
environment-file: continuous_integration/environment-3.9-jdk11-dev.yaml
- name: Download the pre-build jar
uses: actions/download-artifact@v1
environment-file: continuous_integration/environment-3.9-dev.yaml
- name: Setup Rust Toolchain
uses: actions-rs/toolchain@v1
id: rust-toolchain
with:
name: jar
path: dask_sql/jar/
toolchain: stable
override: true
- name: Build the Rust DataFusion bindings
run: |
python setup.py build install
- name: Install cluster dependencies
run: |
mamba install python-blosc lz4 -c conda-forge
Expand Down Expand Up @@ -107,23 +116,22 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Cache local Maven repository
uses: actions/cache@v2
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-v1-jdk11-${{ hashFiles('**/pom.xml') }}
- name: Set up Python
uses: conda-incubator/setup-miniconda@v2
with:
python-version: "3.8"
mamba-version: "*"
channels: dask/label/dev,conda-forge,nodefaults
channel-priority: strict
- name: Download the pre-build jar
uses: actions/download-artifact@v1
- name: Setup Rust Toolchain
uses: actions-rs/toolchain@v1
id: rust-toolchain
with:
name: jar
path: dask_sql/jar/
toolchain: stable
override: true
- name: Build the Rust DataFusion bindings
run: |
python setup.py build install
- name: Install upstream dev Dask / dask-ml
if: needs.detect-ci-trigger.outputs.triggered == 'true'
run: |
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ You can run the tests (after installation) with

pytest tests

GPU-specific tests require additional dependencies specified in `continuous_integration/gpuci/environment.yaml`.
These can be added to the development environment by running

```
conda env update -n dask-sql -f continuous_integration/gpuci/environment.yaml
```

And GPU-specific tests can be run with

```
pytest tests -m gpu --rungpu
```

## SQL Server

`dask-sql` comes with a small test implementation for a SQL server.
Expand Down
11 changes: 1 addition & 10 deletions continuous_integration/environment-3.10-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- dask-ml>=2022.1.22
- dask>=2022.3.0
- fastapi>=0.69.0
- fugue>=0.7.0
- intake>=0.6.0
- jsonschema
- lightgbm
Expand All @@ -31,13 +32,3 @@ dependencies:
- tpot
- tzlocal>=2.1
- uvicorn>=0.11.3
# fugue dependencies; remove when we conda install fugue
- adagio
- antlr4-python3-runtime<4.10
- ciso8601
- fs
- pip
- qpd
- triad
- pip:
- fugue[sql]>=0.5.3
11 changes: 1 addition & 10 deletions continuous_integration/environment-3.8-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- dask-ml=2022.1.22
- dask=2022.3.0
- fastapi=0.69.0
- fugue=0.7.0
- intake=0.6.0
- jsonschema
- lightgbm
Expand All @@ -31,13 +32,3 @@ dependencies:
- tpot
- tzlocal=2.1
- uvicorn=0.11.3
# fugue dependencies; remove when we conda install fugue
- adagio
- antlr4-python3-runtime<4.10
- ciso8601
- fs
- pip
- qpd
- triad
- pip:
- fugue[sql]==0.5.3
11 changes: 1 addition & 10 deletions continuous_integration/environment-3.9-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- dask-ml>=2022.1.22
- dask>=2022.3.0
- fastapi>=0.69.0
- fugue>=0.7.0
- intake>=0.6.0
- jsonschema
- lightgbm
Expand All @@ -31,13 +32,3 @@ dependencies:
- tpot
- tzlocal>=2.1
- uvicorn>=0.11.3
# fugue dependencies; remove when we conda install fugue
- adagio
- antlr4-python3-runtime<4.10
- ciso8601
- fs
- pip
- qpd
- triad
- pip:
- fugue[sql]>=0.5.3
17 changes: 17 additions & 0 deletions continuous_integration/gpuci/environment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: gpuci
channels:
- rapidsai
- rapidsai-nightly
- nvidia
dependencies:
- rust>=1.60.0
- setuptools-rust>=1.2.0
- cudatoolkit=11.5
- cudf=22.08
- cuml=22.08
- dask-cudf=22.08
- dask-cuda=22.08
- numpy>=1.20.0
- ucx-proc=*=gpu
- ucx-py=0.27
- xgboost=*=cuda_*
22 changes: 15 additions & 7 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ def _prepare_schemas(self):
logger.debug("No custom functions defined.")
for function_description in schema.function_lists:
name = function_description.name
sql_return_type = python_to_sql_type(function_description.return_type)
sql_return_type = function_description.return_type
if function_description.aggregation:
logger.debug(f"Adding function '{name}' to schema as aggregation.")
# TODO: Not yet implemented
Expand All @@ -800,10 +800,7 @@ def _prepare_schemas(self):
@staticmethod
def _add_parameters_from_description(function_description, dask_function):
for parameter in function_description.parameters:
param_name, param_type = parameter
sql_param_type = python_to_sql_type(param_type)

dask_function.addParameter(param_name, sql_param_type, False)
dask_function.addParameter(*parameter, False)

return dask_function

Expand Down Expand Up @@ -884,9 +881,16 @@ def _register_callable(
row_udf: bool = False,
):
"""Helper function to do the function or aggregation registration"""

schema_name = schema_name or self.schema_name
schema = self.schema[schema_name]

# validate and cache UDF metadata
sql_parameters = [
(name, python_to_sql_type(param_type)) for name, param_type in parameters
]
sql_return_type = python_to_sql_type(return_type)

if not aggregation:
f = UDF(f, row_udf, parameters, return_type)
lower_name = name.lower()
Expand All @@ -906,10 +910,14 @@ def _register_callable(
)

schema.function_lists.append(
FunctionDescription(name.upper(), parameters, return_type, aggregation)
FunctionDescription(
name.upper(), sql_parameters, sql_return_type, aggregation
)
)
schema.function_lists.append(
FunctionDescription(name.lower(), parameters, return_type, aggregation)
FunctionDescription(
name.lower(), sql_parameters, sql_return_type, aggregation
)
)
schema.functions[lower_name] = f

Expand Down
6 changes: 0 additions & 6 deletions dask_sql/datacontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,6 @@ def __init__(self, func, row_udf: bool, params, return_type=None):

self.names = [param[0] for param in params]

if return_type is None:
# These UDFs go through apply and without providing
# a return type, dask will attempt to guess it, and
# dask might be wrong.
raise ValueError("Return type must be provided")
self.meta = (None, return_type)

def __call__(self, *args, **kwargs):
Expand All @@ -249,7 +244,6 @@ def __call__(self, *args, **kwargs):
df = column_args[0].to_frame(self.names[0])
for name, col in zip(self.names[1:], column_args[1:]):
df[name] = col

result = df.apply(
self.func, axis=1, args=tuple(scalar_args), meta=self.meta
).astype(self.meta[1])
Expand Down
24 changes: 21 additions & 3 deletions dask_sql/integrations/fugue.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
try:
import fugue
import fugue_dask
from dask.distributed import Client
from fugue import WorkflowDataFrame, register_execution_engine
from fugue_sql import FugueSQLWorkflow
from triad import run_at_def
from triad.utils.convert import get_caller_global_local_vars
except ImportError: # pragma: no cover
raise ImportError(
Expand All @@ -15,9 +17,25 @@

from dask_sql.context import Context

register_execution_engine(
"dask", lambda conf: DaskSQLExecutionEngine(conf), on_dup="overwrite"
)

@run_at_def
def _register_engines() -> None:
"""Register (overwrite) the default Dask execution engine of Fugue. This
function is invoked as an entrypoint, users don't need to call it explicitly.
"""
register_execution_engine(
"dask",
lambda conf, **kwargs: DaskSQLExecutionEngine(conf=conf),
on_dup="overwrite",
)

register_execution_engine(
Client,
lambda engine, conf, **kwargs: DaskSQLExecutionEngine(
dask_client=engine, conf=conf
),
on_dup="overwrite",
)


class DaskSQLEngine(fugue.execution.execution_engine.SQLEngine):
Expand Down
26 changes: 17 additions & 9 deletions dask_sql/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@

def python_to_sql_type(python_type) -> "DaskTypeMap":
"""Mapping between python and SQL types."""

if python_type in (int, float):
python_type = np.dtype(python_type)
elif python_type is str:
python_type = np.dtype("object")

if isinstance(python_type, np.dtype):
python_type = python_type.type

Expand Down Expand Up @@ -286,15 +292,17 @@ def cast_column_to_type(col: dd.Series, expected_type: str):
logger.debug("...not converting.")
return None

current_float = pd.api.types.is_float_dtype(current_type)
expected_integer = pd.api.types.is_integer_dtype(expected_type)
if current_float and expected_integer:
logger.debug("...truncating...")
# Currently "trunc" can not be applied to NA (the pandas missing value type),
# because NA is a different type. It works with np.NaN though.
# For our use case, that does not matter, as the conversion to integer later
# will convert both NA and np.NaN to NA.
col = da.trunc(col.fillna(value=np.NaN))
if pd.api.types.is_integer_dtype(expected_type):
if pd.api.types.is_float_dtype(current_type):
logger.debug("...truncating...")
# Currently "trunc" can not be applied to NA (the pandas missing value type),
# because NA is a different type. It works with np.NaN though.
# For our use case, that does not matter, as the conversion to integer later
# will convert both NA and np.NaN to NA.
col = da.trunc(col.fillna(value=np.NaN))
elif pd.api.types.is_timedelta64_dtype(current_type):
logger.debug(f"Explicitly casting from {current_type} to np.int64")
return col.astype(np.int64)

logger.debug(f"Need to cast from {current_type} to {expected_type}")
return col.astype(expected_type)
8 changes: 1 addition & 7 deletions dask_sql/physical/rel/logical/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@
import pandas as pd
from dask import config as dask_config

try:
import dask_cudf

from dask_planner.rust import LogicalPlan
except ImportError:
dask_cudf = None

from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.physical.rex.core.call import IsNullOperation
Expand All @@ -23,6 +16,7 @@

if TYPE_CHECKING:
import dask_sql
from dask_planner.rust import LogicalPlan

logger = logging.getLogger(__name__)

Expand Down
5 changes: 2 additions & 3 deletions dask_sql/physical/rex/core/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def div(self, lhs, rhs):
# of this function.
if isinstance(result, (datetime.timedelta, np.timedelta64)):
return result
else: # pragma: no cover
else:
return da.trunc(result).astype(np.int64)


Expand Down Expand Up @@ -960,8 +960,7 @@ def convert(
]

# Now use the operator name in the mapping
# TODO: obviously this needs to not be hardcoded but not sure of the best place to pull the value from currently???
schema_name = "root"
schema_name = context.schema_name
operator_name = expr.getOperatorName().lower()

try:
Expand Down
Loading