diff --git a/.github/cluster.yml b/.github/cluster.yml index 4d08a389a..da9530ee7 100644 --- a/.github/cluster.yml +++ b/.github/cluster.yml @@ -5,9 +5,6 @@ services: container_name: dask-scheduler image: daskdev/dask:dev-py3.9 command: dask-scheduler - environment: - USE_MAMBA: "true" - EXTRA_CONDA_PACKAGES: "cloudpickle>=1.5.0" # match client cloudpickle version ports: - "8786:8786" dask-worker: diff --git a/.github/workflows/test-upstream.yml b/.github/workflows/test-upstream.yml index e6c5bd213..fd0cae327 100644 --- a/.github/workflows/test-upstream.yml +++ b/.github/workflows/test-upstream.yml @@ -13,6 +13,7 @@ jobs: test-dev: name: "Test upstream dev (${{ matrix.os }}, python: ${{ matrix.python }})" runs-on: ${{ matrix.os }} + if: github.repository == 'dask-contrib/dask-sql' env: CONDA_FILE: continuous_integration/environment-${{ matrix.python }}-dev.yaml defaults: @@ -37,6 +38,15 @@ jobs: channels: dask/label/dev,conda-forge,nodefaults activate-environment: dask-sql environment-file: ${{ env.CONDA_FILE }} + - name: Setup Rust Toolchain + uses: actions-rs/toolchain@v1 + id: rust-toolchain + with: + toolchain: stable + override: true + - name: Build the Rust DataFusion bindings + run: | + python setup.py build install - name: Install hive testing dependencies for Linux if: matrix.os == 'ubuntu-latest' run: | @@ -57,11 +67,6 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: Cache local Maven repository - uses: actions/cache@v2 - with: - path: ~/.m2/repository - key: ${{ runner.os }}-maven-v1-jdk11-${{ hashFiles('**/pom.xml') }} - name: Set up Python uses: conda-incubator/setup-miniconda@v2 with: @@ -71,12 +76,16 @@ jobs: channel-priority: strict channels: dask/label/dev,conda-forge,nodefaults activate-environment: dask-sql - environment-file: continuous_integration/environment-3.9-jdk11-dev.yaml - - name: Download the pre-build jar - uses: actions/download-artifact@v1 + environment-file: continuous_integration/environment-3.9-dev.yaml + - name: Setup Rust Toolchain + uses: actions-rs/toolchain@v1 + id: rust-toolchain with: - name: jar - path: dask_sql/jar/ + toolchain: stable + override: true + - name: Build the Rust DataFusion bindings + run: | + python setup.py build install - name: Install cluster dependencies run: | mamba install python-blosc lz4 -c conda-forge @@ -107,11 +116,6 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: Cache local Maven repository - uses: actions/cache@v2 - with: - path: ~/.m2/repository - key: ${{ runner.os }}-maven-v1-jdk11-${{ hashFiles('**/pom.xml') }} - name: Set up Python uses: conda-incubator/setup-miniconda@v2 with: @@ -119,11 +123,15 @@ jobs: mamba-version: "*" channels: dask/label/dev,conda-forge,nodefaults channel-priority: strict - - name: Download the pre-build jar - uses: actions/download-artifact@v1 + - name: Setup Rust Toolchain + uses: actions-rs/toolchain@v1 + id: rust-toolchain with: - name: jar - path: dask_sql/jar/ + toolchain: stable + override: true + - name: Build the Rust DataFusion bindings + run: | + python setup.py build install - name: Install upstream dev Dask / dask-ml if: needs.detect-ci-trigger.outputs.triggered == 'true' run: | diff --git a/README.md b/README.md index 924ae1e41..b97640bf7 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,19 @@ You can run the tests (after installation) with pytest tests +GPU-specific tests require additional dependencies specified in `continuous_integration/gpuci/environment.yaml`. +These can be added to the development environment by running + +``` +conda env update -n dask-sql -f continuous_integration/gpuci/environment.yaml +``` + +And GPU-specific tests can be run with + +``` +pytest tests -m gpu --rungpu +``` + ## SQL Server `dask-sql` comes with a small test implementation for a SQL server. diff --git a/continuous_integration/environment-3.10-dev.yaml b/continuous_integration/environment-3.10-dev.yaml index 7f5eb34a3..1c5ff1a16 100644 --- a/continuous_integration/environment-3.10-dev.yaml +++ b/continuous_integration/environment-3.10-dev.yaml @@ -6,6 +6,7 @@ dependencies: - dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 +- fugue>=0.7.0 - intake>=0.6.0 - jsonschema - lightgbm @@ -31,13 +32,3 @@ dependencies: - tpot - tzlocal>=2.1 - uvicorn>=0.11.3 -# fugue dependencies; remove when we conda install fugue -- adagio -- antlr4-python3-runtime<4.10 -- ciso8601 -- fs -- pip -- qpd -- triad -- pip: - - fugue[sql]>=0.5.3 diff --git a/continuous_integration/environment-3.8-dev.yaml b/continuous_integration/environment-3.8-dev.yaml index 1506c7f38..c5633f17d 100644 --- a/continuous_integration/environment-3.8-dev.yaml +++ b/continuous_integration/environment-3.8-dev.yaml @@ -6,6 +6,7 @@ dependencies: - dask-ml=2022.1.22 - dask=2022.3.0 - fastapi=0.69.0 +- fugue=0.7.0 - intake=0.6.0 - jsonschema - lightgbm @@ -31,13 +32,3 @@ dependencies: - tpot - tzlocal=2.1 - uvicorn=0.11.3 -# fugue dependencies; remove when we conda install fugue -- adagio -- antlr4-python3-runtime<4.10 -- ciso8601 -- fs -- pip -- qpd -- triad -- pip: - - fugue[sql]==0.5.3 diff --git a/continuous_integration/environment-3.9-dev.yaml b/continuous_integration/environment-3.9-dev.yaml index 7127c827f..8e82e80e7 100644 --- a/continuous_integration/environment-3.9-dev.yaml +++ b/continuous_integration/environment-3.9-dev.yaml @@ -6,6 +6,7 @@ dependencies: - dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 +- fugue>=0.7.0 - intake>=0.6.0 - jsonschema - lightgbm @@ -31,13 +32,3 @@ dependencies: - tpot - tzlocal>=2.1 - uvicorn>=0.11.3 -# fugue dependencies; remove when we conda install fugue -- adagio -- antlr4-python3-runtime<4.10 -- ciso8601 -- fs -- pip -- qpd -- triad -- pip: - - fugue[sql]>=0.5.3 diff --git a/continuous_integration/gpuci/environment.yaml b/continuous_integration/gpuci/environment.yaml new file mode 100644 index 000000000..efceccb2f --- /dev/null +++ b/continuous_integration/gpuci/environment.yaml @@ -0,0 +1,17 @@ +name: gpuci +channels: + - rapidsai + - rapidsai-nightly + - nvidia +dependencies: + - rust>=1.60.0 + - setuptools-rust>=1.2.0 + - cudatoolkit=11.5 + - cudf=22.08 + - cuml=22.08 + - dask-cudf=22.08 + - dask-cuda=22.08 + - numpy>=1.20.0 + - ucx-proc=*=gpu + - ucx-py=0.27 + - xgboost=*=cuda_* diff --git a/dask_sql/context.py b/dask_sql/context.py index a986d7953..4c48b13d0 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -780,7 +780,7 @@ def _prepare_schemas(self): logger.debug("No custom functions defined.") for function_description in schema.function_lists: name = function_description.name - sql_return_type = python_to_sql_type(function_description.return_type) + sql_return_type = function_description.return_type if function_description.aggregation: logger.debug(f"Adding function '{name}' to schema as aggregation.") # TODO: Not yet implemented @@ -800,10 +800,7 @@ def _prepare_schemas(self): @staticmethod def _add_parameters_from_description(function_description, dask_function): for parameter in function_description.parameters: - param_name, param_type = parameter - sql_param_type = python_to_sql_type(param_type) - - dask_function.addParameter(param_name, sql_param_type, False) + dask_function.addParameter(*parameter, False) return dask_function @@ -884,9 +881,16 @@ def _register_callable( row_udf: bool = False, ): """Helper function to do the function or aggregation registration""" + schema_name = schema_name or self.schema_name schema = self.schema[schema_name] + # validate and cache UDF metadata + sql_parameters = [ + (name, python_to_sql_type(param_type)) for name, param_type in parameters + ] + sql_return_type = python_to_sql_type(return_type) + if not aggregation: f = UDF(f, row_udf, parameters, return_type) lower_name = name.lower() @@ -906,10 +910,14 @@ def _register_callable( ) schema.function_lists.append( - FunctionDescription(name.upper(), parameters, return_type, aggregation) + FunctionDescription( + name.upper(), sql_parameters, sql_return_type, aggregation + ) ) schema.function_lists.append( - FunctionDescription(name.lower(), parameters, return_type, aggregation) + FunctionDescription( + name.lower(), sql_parameters, sql_return_type, aggregation + ) ) schema.functions[lower_name] = f diff --git a/dask_sql/datacontainer.py b/dask_sql/datacontainer.py index 97d204f64..86361ee7d 100644 --- a/dask_sql/datacontainer.py +++ b/dask_sql/datacontainer.py @@ -229,11 +229,6 @@ def __init__(self, func, row_udf: bool, params, return_type=None): self.names = [param[0] for param in params] - if return_type is None: - # These UDFs go through apply and without providing - # a return type, dask will attempt to guess it, and - # dask might be wrong. - raise ValueError("Return type must be provided") self.meta = (None, return_type) def __call__(self, *args, **kwargs): @@ -249,7 +244,6 @@ def __call__(self, *args, **kwargs): df = column_args[0].to_frame(self.names[0]) for name, col in zip(self.names[1:], column_args[1:]): df[name] = col - result = df.apply( self.func, axis=1, args=tuple(scalar_args), meta=self.meta ).astype(self.meta[1]) diff --git a/dask_sql/integrations/fugue.py b/dask_sql/integrations/fugue.py index ce685a1ee..11b0c4d3a 100644 --- a/dask_sql/integrations/fugue.py +++ b/dask_sql/integrations/fugue.py @@ -1,8 +1,10 @@ try: import fugue import fugue_dask + from dask.distributed import Client from fugue import WorkflowDataFrame, register_execution_engine from fugue_sql import FugueSQLWorkflow + from triad import run_at_def from triad.utils.convert import get_caller_global_local_vars except ImportError: # pragma: no cover raise ImportError( @@ -15,9 +17,25 @@ from dask_sql.context import Context -register_execution_engine( - "dask", lambda conf: DaskSQLExecutionEngine(conf), on_dup="overwrite" -) + +@run_at_def +def _register_engines() -> None: + """Register (overwrite) the default Dask execution engine of Fugue. This + function is invoked as an entrypoint, users don't need to call it explicitly. + """ + register_execution_engine( + "dask", + lambda conf, **kwargs: DaskSQLExecutionEngine(conf=conf), + on_dup="overwrite", + ) + + register_execution_engine( + Client, + lambda engine, conf, **kwargs: DaskSQLExecutionEngine( + dask_client=engine, conf=conf + ), + on_dup="overwrite", + ) class DaskSQLEngine(fugue.execution.execution_engine.SQLEngine): diff --git a/dask_sql/mappings.py b/dask_sql/mappings.py index 72ec08b4d..083ad921b 100644 --- a/dask_sql/mappings.py +++ b/dask_sql/mappings.py @@ -86,6 +86,12 @@ def python_to_sql_type(python_type) -> "DaskTypeMap": """Mapping between python and SQL types.""" + + if python_type in (int, float): + python_type = np.dtype(python_type) + elif python_type is str: + python_type = np.dtype("object") + if isinstance(python_type, np.dtype): python_type = python_type.type @@ -286,15 +292,17 @@ def cast_column_to_type(col: dd.Series, expected_type: str): logger.debug("...not converting.") return None - current_float = pd.api.types.is_float_dtype(current_type) - expected_integer = pd.api.types.is_integer_dtype(expected_type) - if current_float and expected_integer: - logger.debug("...truncating...") - # Currently "trunc" can not be applied to NA (the pandas missing value type), - # because NA is a different type. It works with np.NaN though. - # For our use case, that does not matter, as the conversion to integer later - # will convert both NA and np.NaN to NA. - col = da.trunc(col.fillna(value=np.NaN)) + if pd.api.types.is_integer_dtype(expected_type): + if pd.api.types.is_float_dtype(current_type): + logger.debug("...truncating...") + # Currently "trunc" can not be applied to NA (the pandas missing value type), + # because NA is a different type. It works with np.NaN though. + # For our use case, that does not matter, as the conversion to integer later + # will convert both NA and np.NaN to NA. + col = da.trunc(col.fillna(value=np.NaN)) + elif pd.api.types.is_timedelta64_dtype(current_type): + logger.debug(f"Explicitly casting from {current_type} to np.int64") + return col.astype(np.int64) logger.debug(f"Need to cast from {current_type} to {expected_type}") return col.astype(expected_type) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 7df45c88b..5ffa8bbfc 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -8,13 +8,6 @@ import pandas as pd from dask import config as dask_config -try: - import dask_cudf - - from dask_planner.rust import LogicalPlan -except ImportError: - dask_cudf = None - from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin from dask_sql.physical.rex.core.call import IsNullOperation @@ -23,6 +16,7 @@ if TYPE_CHECKING: import dask_sql + from dask_planner.rust import LogicalPlan logger = logging.getLogger(__name__) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 39a2037b8..9ee59ba0d 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -186,7 +186,7 @@ def div(self, lhs, rhs): # of this function. if isinstance(result, (datetime.timedelta, np.timedelta64)): return result - else: # pragma: no cover + else: return da.trunc(result).astype(np.int64) @@ -960,8 +960,7 @@ def convert( ] # Now use the operator name in the mapping - # TODO: obviously this needs to not be hardcoded but not sure of the best place to pull the value from currently??? - schema_name = "root" + schema_name = context.schema_name operator_name = expr.getOperatorName().lower() try: diff --git a/docs/environment.yml b/docs/environment.yml index af44d3872..1709b592d 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -9,7 +9,7 @@ dependencies: - dask-sphinx-theme>=2.0.3 - dask>=2022.3.0 - pandas>=1.4.0 - - fugue>=0.5.3 + - fugue>=0.7.0 - jpype1>=1.0.2 - fastapi>=0.69.0 - uvicorn>=0.11.3 diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt index 190a8ec3d..09d783488 100644 --- a/docs/requirements-docs.txt +++ b/docs/requirements-docs.txt @@ -3,7 +3,7 @@ sphinx-tabs dask-sphinx-theme>=3.0.0 dask>=2022.3.0 pandas>=1.4.0 -fugue>=0.5.3 +fugue>=0.7.0 fastapi>=0.69.0 uvicorn>=0.11.3 tzlocal>=2.1 diff --git a/docs/source/fugue.rst b/docs/source/fugue.rst index 264d19fcd..972229c13 100644 --- a/docs/source/fugue.rst +++ b/docs/source/fugue.rst @@ -8,7 +8,7 @@ In order to offer a "best of both worlds" solution, dask-sql includes several op dask-sql as a FugueSQL engine ----------------------------- -FugueSQL users unfamiliar with dask-sql can take advantage of its functionality with minimal code changes by passing :class:`dask_sql.integrations.fugue.DaskSQLExecutionEngine` into the ``FugueSQLWorkflow`` being used to execute commands. +FugueSQL users unfamiliar with dask-sql can take advantage of its functionality by installing it in an environment alongside Fugue; this will automatically register :class:`dask_sql.integrations.fugue.DaskSQLExecutionEngine` as the default Dask execution engine for FugueSQL queries. For more information and sample usage, see `Fugue — dask-sql as a FugueSQL engine `_. Using FugueSQL on an existing ``Context`` diff --git a/docs/source/installation.rst b/docs/source/installation.rst index 2d3b1f005..ec1766fe6 100644 --- a/docs/source/installation.rst +++ b/docs/source/installation.rst @@ -93,6 +93,19 @@ You can run the tests (after installation) with pytest tests +GPU-specific tests require additional dependencies specified in `continuous_integration/gpuci/environment.yaml`. +These can be added to the development environment by running + +.. code-block:: bash + + conda env update -n dask-sql -f continuous_integration/gpuci/environment.yaml + +And GPU-specific tests can be run with + +.. code-block:: bash + + pytest tests -m gpu --rungpu + This repository uses pre-commit hooks. To install them, call .. code-block:: bash diff --git a/notebooks/FugueSQL.ipynb b/notebooks/FugueSQL.ipynb new file mode 100644 index 000000000..1d59b8f78 --- /dev/null +++ b/notebooks/FugueSQL.ipynb @@ -0,0 +1,577 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f39e2dbc-21a1-4d9a-bed7-e2bf2bd25bb8", + "metadata": {}, + "source": [ + "# FugueSQL Integrations\n", + "\n", + "[FugueSQL](https://fugue-tutorials.readthedocs.io/tutorials/fugue_sql/index.html) is a related project that aims to provide a unified SQL interface for a variety of different computing frameworks, including Dask.\n", + "While it offers a SQL engine with a larger set of supported commands, this comes at the cost of slower performance when using Dask in comparison to dask-sql.\n", + "In order to offer a \"best of both worlds\" solution, dask-sql can easily be integrated with FugueSQL, using its faster implementation of SQL commands when possible and falling back on FugueSQL's implementation when necessary." + ] + }, + { + "cell_type": "markdown", + "id": "90e31400", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "FugueSQL offers the cell magic `%%fsql`, which can be used to define and execute queries entirely in SQL, with no need for external Python code!\n", + "\n", + "To use this cell magic, users must install [fugue-jupyter](https://pypi.org/project/fugue-jupyter/), which will additionally provide SQL syntax highlighting (note that the kernel must be restart after installing):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "96c3ad1a", + "metadata": {}, + "outputs": [], + "source": [ + "!pip install fugue-jupyter" + ] + }, + { + "cell_type": "markdown", + "id": "ae79361a", + "metadata": {}, + "source": [ + "And run `fugue_jupyter.setup()` to register the magic:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "2df05f5b", + "metadata": {}, + "outputs": [], + "source": [ + "from fugue_jupyter import setup\n", + "\n", + "setup()" + ] + }, + { + "cell_type": "markdown", + "id": "d3b8bfe5", + "metadata": {}, + "source": [ + "We will also start up a Dask client, which can be specified as an execution engine for FugueSQL queries:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "a35d98e6-f24e-46c4-a4e6-b64d649d8ba7", + "metadata": {}, + "outputs": [], + "source": [ + "from dask.distributed import Client\n", + "\n", + "client = Client()" + ] + }, + { + "cell_type": "markdown", + "id": "bcb96523", + "metadata": {}, + "source": [ + "## dask-sql as a FugueSQL execution engine\n", + "\n", + "When dask-sql is installed, its `DaskSQLExecutionEngine` is automatically registered as the default engine for FugueSQL queries ran on Dask.\n", + "We can then use it to run queries with the `%%fsql` cell magic, specifying `dask` as the execution engine to run the query on:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "ff633572-ad08-4de1-8678-a8fbd09effd1", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
a
0xyz
\n", + "
" + ], + "text/plain": [ + " a\n", + "0 xyz" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "schema: a:str" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%%fsql dask\n", + "\n", + "CREATE [[\"xyz\"], [\"xxx\"]] SCHEMA a:str\n", + "SELECT * WHERE a LIKE '%y%'\n", + "PRINT" + ] + }, + { + "cell_type": "markdown", + "id": "7f16b7d9-6b45-4caf-bbcb-63cc5d858556", + "metadata": {}, + "source": [ + "We can also use the `YIELD` keyword to register the results of our queries into Python objects:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "521965bc-1a4c-49ab-b48f-789351cb24d4", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
b
0xyz
1xxx-
\n", + "
" + ], + "text/plain": [ + " b\n", + "0 xyz\n", + "1 xxx-" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "schema: b:str" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%%fsql dask\n", + "src = CREATE [[\"xyz\"], [\"xxx\"]] SCHEMA a:str\n", + "\n", + "a = SELECT a AS b WHERE a LIKE '%y%'\n", + " YIELD DATAFRAME AS test\n", + "\n", + "b = SELECT CONCAT(a, '-') AS b FROM src WHERE a LIKE '%xx%'\n", + " YIELD DATAFRAME AS test1\n", + "\n", + "SELECT * FROM a UNION SELECT * FROM b\n", + "PRINT" + ] + }, + { + "cell_type": "markdown", + "id": "dfbb0a9a", + "metadata": {}, + "source": [ + "Which can then be interacted with outside of SQL:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "79a3e87a-2764-410c-b257-c710c4a6c6d4", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
Dask DataFrame Structure:
\n", + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
b
npartitions=2
object
...
...
\n", + "
\n", + "
Dask Name: rename, 16 tasks
" + ], + "text/plain": [ + "Dask DataFrame Structure:\n", + " b\n", + "npartitions=2 \n", + " object\n", + " ...\n", + " ...\n", + "Dask Name: rename, 16 tasks" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "test.native # a Dask DataFrame" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "c98cb652-06e2-444a-b70a-fdd3de9ecd15", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
b
1xxx-
\n", + "
" + ], + "text/plain": [ + " b\n", + "1 xxx-" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "test1.native.compute()" + ] + }, + { + "cell_type": "markdown", + "id": "932ede31-90b2-49e5-9f4d-7cf1b8d919d2", + "metadata": {}, + "source": [ + "We can also run the equivalent of these queries in python code using `fugue_sql.fsql`, passing the Dask client into its `run` method to specify Dask as an execution engine:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "c265b170-de4d-4fab-aeae-9f94031e960d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
a
0xyz
\n", + "
" + ], + "text/plain": [ + " a\n", + "0 xyz" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "schema: a:str" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "DataFrames()" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from fugue_sql import fsql\n", + "\n", + "fsql(\"\"\"\n", + "CREATE [[\"xyz\"], [\"xxx\"]] SCHEMA a:str\n", + "SELECT * WHERE a LIKE '%y%'\n", + "PRINT\n", + "\"\"\").run(client)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "77e3bf50-8c8b-4e2f-a5e7-28b1d86499d7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
Dask DataFrame Structure:
\n", + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
a
npartitions=2
object
...
...
\n", + "
\n", + "
Dask Name: rename, 16 tasks
" + ], + "text/plain": [ + "Dask DataFrame Structure:\n", + " a\n", + "npartitions=2 \n", + " object\n", + " ...\n", + " ...\n", + "Dask Name: rename, 16 tasks" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "result = fsql(\"\"\"\n", + "CREATE [[\"xyz\"], [\"xxx\"]] SCHEMA a:str\n", + "SELECT * WHERE a LIKE '%y%'\n", + "YIELD DATAFRAME AS test2\n", + "\"\"\").run(client)\n", + "\n", + "result[\"test2\"].native # a Dask DataFrame" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7d4c71d4-238f-4c72-8609-dbbe0782aea9", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + }, + "vscode": { + "interpreter": { + "hash": "656801d214ad98d4b301386b078628ce3ae2dbd81a59ed4deed7a5b13edfab09" + } + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/setup.py b/setup.py index 5ae40ce3b..1e6c4ff18 100644 --- a/setup.py +++ b/setup.py @@ -66,13 +66,16 @@ "black==22.3.0", "isort==5.7.0", ], - "fugue": ["fugue[sql]>=0.5.3"], + "fugue": ["fugue>=0.7.0"], }, entry_points={ "console_scripts": [ "dask-sql-server = dask_sql.server.app:main", "dask-sql = dask_sql.cmd:main", - ] + ], + "fugue.plugins": [ + "dasksql = dask_sql.integrations.fugue:_register_engines[fugue]" + ], }, zip_safe=False, cmdclass=cmdclass, diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index 23f779e44..e68ebd323 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -180,6 +180,9 @@ def test_filter_year(c): ], ), ( + # The predicate-pushdown optimization will be skipped here, + # because datetime accessors are not supported. However, + # the query should still succeed. "SELECT * FROM parquet_ddf WHERE year(d) < 2015", lambda x: x[x["d"].dt.year < 2015], None, diff --git a/tests/integration/test_fugue.py b/tests/integration/test_fugue.py index 7176b467c..0dffb713c 100644 --- a/tests/integration/test_fugue.py +++ b/tests/integration/test_fugue.py @@ -16,7 +16,8 @@ @pytest.mark.skip( reason="WIP DataFusion - https://github.com/dask-contrib/dask-sql/issues/459" ) -def test_simple_statement(): +@skip_if_external_scheduler +def test_simple_statement(client): with fugue_sql.FugueSQLWorkflow(DaskSQLExecutionEngine) as dag: df = dag.df([[0, "hello"], [1, "world"]], "a:int64,b:str") dag("SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result") @@ -35,6 +36,14 @@ def test_simple_statement(): return_df = result["result"].as_pandas() assert_eq(return_df, pd.DataFrame({"a": [1], "b": ["world"]})) + result = fugue_sql.fsql( + "SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result", + df=pdf, + ).run(client) + + return_df = result["result"].as_pandas() + assert_eq(return_df, pd.DataFrame({"a": [1], "b": ["world"]})) + # TODO: Revisit fixing this on an independant cluster (without dask-sql) based on the # discussion in https://github.com/dask-contrib/dask-sql/issues/407 @@ -42,7 +51,7 @@ def test_simple_statement(): reason="WIP DataFusion - https://github.com/dask-contrib/dask-sql/issues/459" ) @skip_if_external_scheduler -def test_fsql(): +def test_fsql(client): def assert_fsql(df: pd.DataFrame) -> None: assert_eq(df, pd.DataFrame({"a": [1]})) diff --git a/tests/integration/test_function.py b/tests/integration/test_function.py index 09fe8890c..ae006734f 100644 --- a/tests/integration/test_function.py +++ b/tests/integration/test_function.py @@ -55,17 +55,12 @@ def f(row): ) @pytest.mark.parametrize( "retty", - [None, np.float64, np.float32, np.int64, np.int32, np.int16, np.int8, np.bool_], + [np.float64, np.float32, np.int64, np.int32, np.int16, np.int8, np.bool_], ) def test_custom_function_row_return_types(c, df, retty): def f(row): return row["x"] ** 2 - if retty is None: - with pytest.raises(ValueError): - c.register_function(f, "f", [("x", np.float64)], retty, row_udf=True) - return - c.register_function(f, "f", [("x", np.float64)], retty, row_udf=True) return_df = c.sql("SELECT F(a) AS a FROM df") @@ -211,3 +206,17 @@ def f(x): c.register_aggregation(fagg, "fagg", [("x", np.float64)], np.float64) c.register_aggregation(fagg, "fagg", [("x", np.float64)], np.float64, replace=True) + + +@pytest.mark.parametrize("dtype", [np.timedelta64, None, "a string"]) +def test_unsupported_dtype(c, dtype): + def f(x): + return x**2 + + # test that an invalid return type raises + with pytest.raises(NotImplementedError): + c.register_function(f, "f", [("x", np.int64)], dtype) + + # test that an invalid param type raises + with pytest.raises(NotImplementedError): + c.register_function(f, "f", [("x", dtype)], np.int64) diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index 269713915..979ee0296 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -5,6 +5,7 @@ import pytest from dask_sql import Context +from dask_sql.mappings import python_to_sql_type from tests.utils import assert_eq try: @@ -101,9 +102,8 @@ def test_sql(gpu): c.create_table("df", data_frame, gpu=gpu) result = c.sql("SELECT * FROM df") - assert isinstance(result, dd.DataFrame if not gpu else dask_cudf.DataFrame) - - dd.assert_eq(result, data_frame) + assert isinstance(result, dd.DataFrame) + assert_eq(result, data_frame) result = c.sql("SELECT * FROM df", return_futures=False) assert not isinstance(result, dd.DataFrame) @@ -196,6 +196,11 @@ def g(gpu=gpu): g(gpu=gpu) +int_sql_type = python_to_sql_type(int).getSqlType() +float_sql_type = python_to_sql_type(float).getSqlType() +str_sql_type = python_to_sql_type(str).getSqlType() + + def test_function_adding(): c = Context() @@ -209,12 +214,26 @@ def test_function_adding(): assert c.schema[c.schema_name].functions["f"].func == f assert len(c.schema[c.schema_name].function_lists) == 2 assert c.schema[c.schema_name].function_lists[0].name == "F" - assert c.schema[c.schema_name].function_lists[0].parameters == [("x", int)] - assert c.schema[c.schema_name].function_lists[0].return_type == float + assert c.schema[c.schema_name].function_lists[0].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[0].parameters[0][1].getSqlType() + == int_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[0].return_type.getSqlType() + == float_sql_type + ) assert not c.schema[c.schema_name].function_lists[0].aggregation assert c.schema[c.schema_name].function_lists[1].name == "f" - assert c.schema[c.schema_name].function_lists[1].parameters == [("x", int)] - assert c.schema[c.schema_name].function_lists[1].return_type == float + assert c.schema[c.schema_name].function_lists[1].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[1].parameters[0][1].getSqlType() + == int_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[1].return_type.getSqlType() + == float_sql_type + ) assert not c.schema[c.schema_name].function_lists[1].aggregation # Without replacement @@ -224,12 +243,26 @@ def test_function_adding(): assert c.schema[c.schema_name].functions["f"].func == f assert len(c.schema[c.schema_name].function_lists) == 4 assert c.schema[c.schema_name].function_lists[2].name == "F" - assert c.schema[c.schema_name].function_lists[2].parameters == [("x", float)] - assert c.schema[c.schema_name].function_lists[2].return_type == int + assert c.schema[c.schema_name].function_lists[2].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[2].parameters[0][1].getSqlType() + == float_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[2].return_type.getSqlType() + == int_sql_type + ) assert not c.schema[c.schema_name].function_lists[2].aggregation assert c.schema[c.schema_name].function_lists[3].name == "f" - assert c.schema[c.schema_name].function_lists[3].parameters == [("x", float)] - assert c.schema[c.schema_name].function_lists[3].return_type == int + assert c.schema[c.schema_name].function_lists[3].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[3].parameters[0][1].getSqlType() + == float_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[3].return_type.getSqlType() + == int_sql_type + ) assert not c.schema[c.schema_name].function_lists[3].aggregation # With replacement @@ -240,12 +273,26 @@ def test_function_adding(): assert c.schema[c.schema_name].functions["f"].func == f assert len(c.schema[c.schema_name].function_lists) == 2 assert c.schema[c.schema_name].function_lists[0].name == "F" - assert c.schema[c.schema_name].function_lists[0].parameters == [("x", str)] - assert c.schema[c.schema_name].function_lists[0].return_type == str + assert c.schema[c.schema_name].function_lists[0].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[0].parameters[0][1].getSqlType() + == str_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[0].return_type.getSqlType() + == str_sql_type + ) assert not c.schema[c.schema_name].function_lists[0].aggregation assert c.schema[c.schema_name].function_lists[1].name == "f" - assert c.schema[c.schema_name].function_lists[1].parameters == [("x", str)] - assert c.schema[c.schema_name].function_lists[1].return_type == str + assert c.schema[c.schema_name].function_lists[1].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[1].parameters[0][1].getSqlType() + == str_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[1].return_type.getSqlType() + == str_sql_type + ) assert not c.schema[c.schema_name].function_lists[1].aggregation @@ -262,12 +309,26 @@ def test_aggregation_adding(): assert c.schema[c.schema_name].functions["f"] == f assert len(c.schema[c.schema_name].function_lists) == 2 assert c.schema[c.schema_name].function_lists[0].name == "F" - assert c.schema[c.schema_name].function_lists[0].parameters == [("x", int)] - assert c.schema[c.schema_name].function_lists[0].return_type == float + assert c.schema[c.schema_name].function_lists[0].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[0].parameters[0][1].getSqlType() + == int_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[0].return_type.getSqlType() + == float_sql_type + ) assert c.schema[c.schema_name].function_lists[0].aggregation assert c.schema[c.schema_name].function_lists[1].name == "f" - assert c.schema[c.schema_name].function_lists[1].parameters == [("x", int)] - assert c.schema[c.schema_name].function_lists[1].return_type == float + assert c.schema[c.schema_name].function_lists[1].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[1].parameters[0][1].getSqlType() + == int_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[1].return_type.getSqlType() + == float_sql_type + ) assert c.schema[c.schema_name].function_lists[1].aggregation # Without replacement @@ -277,12 +338,26 @@ def test_aggregation_adding(): assert c.schema[c.schema_name].functions["f"] == f assert len(c.schema[c.schema_name].function_lists) == 4 assert c.schema[c.schema_name].function_lists[2].name == "F" - assert c.schema[c.schema_name].function_lists[2].parameters == [("x", float)] - assert c.schema[c.schema_name].function_lists[2].return_type == int + assert c.schema[c.schema_name].function_lists[2].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[2].parameters[0][1].getSqlType() + == float_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[2].return_type.getSqlType() + == int_sql_type + ) assert c.schema[c.schema_name].function_lists[2].aggregation assert c.schema[c.schema_name].function_lists[3].name == "f" - assert c.schema[c.schema_name].function_lists[3].parameters == [("x", float)] - assert c.schema[c.schema_name].function_lists[3].return_type == int + assert c.schema[c.schema_name].function_lists[3].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[3].parameters[0][1].getSqlType() + == float_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[3].return_type.getSqlType() + == int_sql_type + ) assert c.schema[c.schema_name].function_lists[3].aggregation # With replacement @@ -293,12 +368,26 @@ def test_aggregation_adding(): assert c.schema[c.schema_name].functions["f"] == f assert len(c.schema[c.schema_name].function_lists) == 2 assert c.schema[c.schema_name].function_lists[0].name == "F" - assert c.schema[c.schema_name].function_lists[0].parameters == [("x", str)] - assert c.schema[c.schema_name].function_lists[0].return_type == str + assert c.schema[c.schema_name].function_lists[0].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[0].parameters[0][1].getSqlType() + == str_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[0].return_type.getSqlType() + == str_sql_type + ) assert c.schema[c.schema_name].function_lists[0].aggregation assert c.schema[c.schema_name].function_lists[1].name == "f" - assert c.schema[c.schema_name].function_lists[1].parameters == [("x", str)] - assert c.schema[c.schema_name].function_lists[1].return_type == str + assert c.schema[c.schema_name].function_lists[1].parameters[0][0] == "x" + assert ( + c.schema[c.schema_name].function_lists[1].parameters[0][1].getSqlType() + == str_sql_type + ) + assert ( + c.schema[c.schema_name].function_lists[1].return_type.getSqlType() + == str_sql_type + ) assert c.schema[c.schema_name].function_lists[1].aggregation