diff --git a/.github/cluster-upstream.yml b/.github/cluster-upstream.yml new file mode 100644 index 000000000..3bebbde0c --- /dev/null +++ b/.github/cluster-upstream.yml @@ -0,0 +1,21 @@ +# Docker-compose setup used during tests +version: '3' +services: + dask-scheduler: + container_name: dask-scheduler + image: daskdev/dask:dev-py3.9 + command: dask-scheduler + environment: + USE_MAMBA: "true" + EXTRA_CONDA_PACKAGES: "dask/label/dev::dask cloudpickle>=2.1.0" + ports: + - "8786:8786" + dask-worker: + container_name: dask-worker + image: daskdev/dask:dev-py3.9 + command: dask-worker dask-scheduler:8786 + environment: + USE_MAMBA: "true" + EXTRA_CONDA_PACKAGES: "dask/label/dev::dask cloudpickle>=2.1.0 pyarrow>=3.0.0 libstdcxx-ng>=12.1.0" + volumes: + - /tmp:/tmp diff --git a/.github/docker-compose.yaml b/.github/cluster.yml similarity index 76% rename from .github/docker-compose.yaml rename to .github/cluster.yml index 21997b505..4d08a389a 100644 --- a/.github/docker-compose.yaml +++ b/.github/cluster.yml @@ -3,7 +3,7 @@ version: '3' services: dask-scheduler: container_name: dask-scheduler - image: daskdev/dask:dev + image: daskdev/dask:dev-py3.9 command: dask-scheduler environment: USE_MAMBA: "true" @@ -12,10 +12,10 @@ services: - "8786:8786" dask-worker: container_name: dask-worker - image: daskdev/dask:dev + image: daskdev/dask:dev-py3.9 command: dask-worker dask-scheduler:8786 environment: USE_MAMBA: "true" - EXTRA_CONDA_PACKAGES: "pyarrow>=4.0.0" # required for parquet IO + EXTRA_CONDA_PACKAGES: "cloudpickle>=2.1.0 pyarrow>=3.0.0 libstdcxx-ng>=12.1.0" volumes: - /tmp:/tmp diff --git a/.github/workflows/datafusion-sync.yml b/.github/workflows/datafusion-sync.yml new file mode 100644 index 000000000..fd544eeae --- /dev/null +++ b/.github/workflows/datafusion-sync.yml @@ -0,0 +1,30 @@ +name: Keep datafusion branch up to date +on: + push: + branches: + - main + +# When this workflow is queued, automatically cancel any previous running +# or pending jobs +concurrency: + group: datafusion-sync + cancel-in-progress: true + +jobs: + sync-branches: + runs-on: ubuntu-latest + if: github.repository == 'dask-contrib/dask-sql' + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Set up Node + uses: actions/setup-node@v2 + with: + node-version: 12 + - name: Opening pull request + id: pull + uses: tretuna/sync-branches@1.4.0 + with: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + FROM_BRANCH: main + TO_BRANCH: datafusion-sql-planner diff --git a/.github/workflows/test-upstream.yml b/.github/workflows/test-upstream.yml index b9fc415b9..e6c5bd213 100644 --- a/.github/workflows/test-upstream.yml +++ b/.github/workflows/test-upstream.yml @@ -4,6 +4,11 @@ on: - cron: "0 0 * * *" # Daily β€œAt 00:00” UTC workflow_dispatch: # allows you to trigger the workflow run manually +# Required shell entrypoint to have properly activated conda environments +defaults: + run: + shell: bash -l {0} + jobs: test-dev: name: "Test upstream dev (${{ matrix.os }}, python: ${{ matrix.python }})" @@ -29,6 +34,7 @@ jobs: use-mamba: true python-version: ${{ matrix.python }} channel-priority: strict + channels: dask/label/dev,conda-forge,nodefaults activate-environment: dask-sql environment-file: ${{ env.CONDA_FILE }} - name: Install hive testing dependencies for Linux @@ -39,23 +45,110 @@ jobs: docker pull bde2020/hive-metastore-postgresql:2.3.0 - name: Install upstream dev Dask / dask-ml run: | - python -m pip install --no-deps git+https://github.com/dask/dask - python -m pip install --no-deps git+https://github.com/dask/distributed + mamba update dask python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Test with pytest run: | pytest --junitxml=junit/test-results.xml --cov-report=xml -n auto tests --dist loadfile + cluster-dev: + name: "Test upstream dev in a dask cluster" + needs: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Cache local Maven repository + uses: actions/cache@v2 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-v1-jdk11-${{ hashFiles('**/pom.xml') }} + - name: Set up Python + uses: conda-incubator/setup-miniconda@v2 + with: + miniforge-variant: Mambaforge + use-mamba: true + python-version: "3.9" + channel-priority: strict + channels: dask/label/dev,conda-forge,nodefaults + activate-environment: dask-sql + environment-file: continuous_integration/environment-3.9-jdk11-dev.yaml + - name: Download the pre-build jar + uses: actions/download-artifact@v1 + with: + name: jar + path: dask_sql/jar/ + - name: Install cluster dependencies + run: | + mamba install python-blosc lz4 -c conda-forge + + which python + pip list + mamba list + - name: Install upstream dev dask-ml + run: | + mamba update dask + python -m pip install --no-deps git+https://github.com/dask/dask-ml + - name: run a dask cluster + run: | + docker-compose -f .github/cluster-upstream.yml up -d + + # periodically ping logs until a connection has been established; assume failure after 2 minutes + timeout 2m bash -c 'until docker logs dask-worker 2>&1 | grep -q "Starting established connection"; do sleep 1; done' + + docker logs dask-scheduler + docker logs dask-worker + - name: Test with pytest while running an independent dask cluster + run: | + DASK_SQL_TEST_SCHEDULER="tcp://127.0.0.1:8786" pytest --junitxml=junit/test-cluster-results.xml --cov-report=xml -n auto tests --dist loadfile + + import-dev: + name: "Test importing with bare requirements and upstream dev" + needs: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Cache local Maven repository + uses: actions/cache@v2 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-v1-jdk11-${{ hashFiles('**/pom.xml') }} + - name: Set up Python + uses: conda-incubator/setup-miniconda@v2 + with: + python-version: "3.8" + mamba-version: "*" + channels: dask/label/dev,conda-forge,nodefaults + channel-priority: strict + - name: Download the pre-build jar + uses: actions/download-artifact@v1 + with: + name: jar + path: dask_sql/jar/ + - name: Install upstream dev Dask / dask-ml + if: needs.detect-ci-trigger.outputs.triggered == 'true' + run: | + mamba update dask + python -m pip install --no-deps git+https://github.com/dask/dask-ml + - name: Install dependencies and nothing else + run: | + pip install -e . + + which python + pip list + mamba list + - name: Try to import dask-sql + run: | + python -c "import dask_sql; print('ok')" + report-failures: name: Open issue for upstream dev failures - needs: test-dev + needs: [test-dev, cluster-dev] if: | always() - && needs.test-dev.result == 'failure' + && ( + needs.test-dev.result == 'failure' || needs.cluster-dev.result == 'failure' + ) runs-on: ubuntu-latest - defaults: - run: - shell: bash steps: - uses: actions/checkout@v2 - name: Report failures diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4c8016b7b..223fad4b7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -57,6 +57,7 @@ jobs: use-mamba: true python-version: ${{ matrix.python }} channel-priority: strict + channels: ${{ needs.detect-ci-trigger.outputs.triggered == 'true' && 'dask/label/dev,conda-forge,nodefaults' || 'conda-forge,nodefaults' }} activate-environment: dask-sql environment-file: ${{ env.CONDA_FILE }} - name: Setup Rust Toolchain @@ -77,8 +78,7 @@ jobs: - name: Optionally install upstream dev Dask / dask-ml if: needs.detect-ci-trigger.outputs.triggered == 'true' run: | - python -m pip install --no-deps git+https://github.com/dask/dask - python -m pip install --no-deps git+https://github.com/dask/distributed + mamba update dask python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Test with pytest run: | @@ -107,10 +107,11 @@ jobs: with: miniforge-variant: Mambaforge use-mamba: true - python-version: "3.8" + python-version: "3.9" channel-priority: strict + channels: ${{ needs.detect-ci-trigger.outputs.triggered == 'true' && 'dask/label/dev,conda-forge,nodefaults' || 'conda-forge,nodefaults' }} activate-environment: dask-sql - environment-file: continuous_integration/environment-3.8-dev.yaml + environment-file: continuous_integration/environment-3.9-dev.yaml - name: Setup Rust Toolchain uses: actions-rs/toolchain@v1 id: rust-toolchain @@ -127,18 +128,23 @@ jobs: which python pip list mamba list - - name: Optionally install upstream dev Dask / dask-ml + - name: Optionally install upstream dev dask-ml if: needs.detect-ci-trigger.outputs.triggered == 'true' run: | - python -m pip install --no-deps git+https://github.com/dask/dask - python -m pip install --no-deps git+https://github.com/dask/distributed + mamba update dask python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: run a dask cluster + env: + UPSTREAM: ${{ needs.detect-ci-trigger.outputs.triggered }} run: | - docker-compose -f .github/docker-compose.yaml up -d + if [[ $UPSTREAM == "true" ]]; then + docker-compose -f .github/cluster-upstream.yml up -d + else + docker-compose -f .github/cluster.yml up -d + fi - # Wait for installation - sleep 40 + # periodically ping logs until a connection has been established; assume failure after 2 minutes + timeout 2m bash -c 'until docker logs dask-worker 2>&1 | grep -q "Starting established connection"; do sleep 1; done' docker logs dask-scheduler docker logs dask-worker @@ -157,7 +163,7 @@ jobs: with: python-version: "3.8" mamba-version: "*" - channels: conda-forge,defaults + channels: ${{ needs.detect-ci-trigger.outputs.triggered == 'true' && 'dask/label/dev,conda-forge,nodefaults' || 'conda-forge,nodefaults' }} channel-priority: strict - name: Install dependencies and nothing else run: | @@ -167,12 +173,6 @@ jobs: which python pip list mamba list - - name: Optionally install upstream dev Dask / dask-ml - if: needs.detect-ci-trigger.outputs.triggered == 'true' - run: | - python -m pip install --no-deps git+https://github.com/dask/dask - python -m pip install --no-deps git+https://github.com/dask/distributed - python -m pip install --no-deps git+https://github.com/dask/dask-ml - name: Try to import dask-sql run: | python -c "import dask_sql; print('ok')" diff --git a/continuous_integration/environment-3.10-dev.yaml b/continuous_integration/environment-3.10-dev.yaml index 131dd457e..13545bb84 100644 --- a/continuous_integration/environment-3.10-dev.yaml +++ b/continuous_integration/environment-3.10-dev.yaml @@ -3,40 +3,41 @@ channels: - conda-forge - nodefaults dependencies: -- adagio>=0.2.3 -- antlr4-python3-runtime>=4.9.2, <4.10.0 # Remove max pin after qpd(fugue dependency) updates their conda recipe -- black=22.3.0 -- ciso8601>=2.2.0 - dask-ml>=2022.1.22 - dask>=2022.3.0 -- fastapi>=0.61.1 -- fs>=2.4.11 +- fastapi>=0.69.0 - intake>=0.6.0 -- isort=5.7.0 -- jsonschema>=4.4.0 -- lightgbm>=3.2.1 -- mlflow>=1.19.0 -- mock>=4.0.3 -- nest-asyncio>=1.4.3 -- pandas>=1.0.0 # below 1.0, there were no nullable ext. types -- pip=20.2.4 -- pre-commit>=2.11.1 -- prompt_toolkit>=3.0.8 -- psycopg2>=2.9.1 -- pygments>=2.7.1 -- pyhive>=0.6.4 -- pytest-cov>=2.10.1 +- jsonschema +- lightgbm +- maturin>=0.12.8 +- mlflow +- mock +- nest-asyncio +- pandas>=1.1.2 +- pre-commit +- prompt_toolkit +- psycopg2 +- pyarrow>=3.0.0 +- pygments +- pyhive +- pytest-cov - pytest-xdist -- pytest>=6.0.1 +- pytest - python=3.10 -- scikit-learn>=0.24.2 -- sphinx>=3.2.1 -- tpot>=0.11.7 -- triad>=0.5.4 +- rust>=1.60.0 +- scikit-learn>=1.0.0 +- setuptools-rust>=1.1.2 +- sphinx +- tpot - tzlocal>=2.1 - uvicorn>=0.11.3 -- maturin>=0.12.8 -- setuptools-rust>=1.1.2 -- rust>=1.60.0 +# fugue dependencies; remove when we conda install fugue +- adagio +- antlr4-python3-runtime<4.10 +- ciso8601 +- fs +- pip +- qpd +- triad - pip: - fugue[sql]>=0.5.3 diff --git a/continuous_integration/environment-3.8-dev.yaml b/continuous_integration/environment-3.8-dev.yaml index 5bbd2494b..b04c419f6 100644 --- a/continuous_integration/environment-3.8-dev.yaml +++ b/continuous_integration/environment-3.8-dev.yaml @@ -3,40 +3,41 @@ channels: - conda-forge - nodefaults dependencies: -- adagio>=0.2.3 -- antlr4-python3-runtime>=4.9.2, <4.10.0 # Remove max pin after qpd(fugue dependency) updates their conda recipe -- black=22.3.0 -- ciso8601>=2.2.0 -- dask-ml>=2022.1.22 -- dask>=2022.3.0 -- fastapi>=0.61.1 -- fs>=2.4.11 -- intake>=0.6.0 -- isort=5.7.0 -- jsonschema>=4.4.0 -- lightgbm>=3.2.1 -- mlflow>=1.19.0 -- mock>=4.0.3 -- nest-asyncio>=1.4.3 -- pandas>=1.0.0 # below 1.0, there were no nullable ext. types -- pip=20.2.4 -- pre-commit>=2.11.1 -- prompt_toolkit>=3.0.8 -- psycopg2>=2.9.1 -- pygments>=2.7.1 -- pyhive>=0.6.4 -- pytest-cov>=2.10.1 +- dask-ml=2022.1.22 +- dask=2022.3.0 +- fastapi=0.69.0 +- intake=0.6.0 +- jsonschema +- lightgbm +- maturin=0.12.8 +- mlflow +- mock +- nest-asyncio +- pandas=1.1.2 +- pre-commit +- prompt_toolkit +- psycopg2 +- pyarrow=3.0.0 +- pygments +- pyhive +- pytest-cov - pytest-xdist -- pytest>=6.0.1 +- pytest - python=3.8 -- scikit-learn>=0.24.2 -- sphinx>=3.2.1 -- tpot>=0.11.7 -- triad>=0.5.4 -- tzlocal>=2.1 -- uvicorn>=0.11.3 -- maturin>=0.12.8 -- setuptools-rust>=1.1.2 -- rust>=1.60.0 +- rust=1.60.0 +- scikit-learn=1.0.0 +- setuptools-rust=1.1.2 +- sphinx +- tpot +- tzlocal=2.1 +- uvicorn=0.11.3 +# fugue dependencies; remove when we conda install fugue +- adagio +- antlr4-python3-runtime<4.10 +- ciso8601 +- fs +- pip +- qpd +- triad - pip: - - fugue[sql]>=0.5.3 + - fugue[sql]==0.5.3 diff --git a/continuous_integration/environment-3.9-dev.yaml b/continuous_integration/environment-3.9-dev.yaml index bdf103f8c..20390f2fb 100644 --- a/continuous_integration/environment-3.9-dev.yaml +++ b/continuous_integration/environment-3.9-dev.yaml @@ -3,40 +3,41 @@ channels: - conda-forge - nodefaults dependencies: -- adagio>=0.2.3 -- antlr4-python3-runtime>=4.9.2, <4.10.0 # Remove max pin after qpd(fugue dependency) updates their conda recipe -- black=22.3.0 -- ciso8601>=2.2.0 - dask-ml>=2022.1.22 - dask>=2022.3.0 -- fastapi>=0.61.1 -- fs>=2.4.11 +- fastapi>=0.69.0 - intake>=0.6.0 -- isort=5.7.0 -- jsonschema>=4.4.0 -- lightgbm>=3.2.1 -- mlflow>=1.19.0 -- mock>=4.0.3 -- nest-asyncio>=1.4.3 -- pandas>=1.0.0 # below 1.0, there were no nullable ext. types -- pip=20.2.4 -- pre-commit>=2.11.1 -- prompt_toolkit>=3.0.8 -- psycopg2>=2.9.1 -- pygments>=2.7.1 -- pyhive>=0.6.4 -- pytest-cov>=2.10.1 +- jsonschema +- lightgbm +- maturin>=0.12.8 +- mlflow +- mock +- nest-asyncio +- pandas>=1.1.2 +- pre-commit +- prompt_toolkit +- psycopg2 +- pyarrow>=3.0.0 +- pygments +- pyhive +- pytest-cov - pytest-xdist -- pytest>=6.0.1 +- pytest - python=3.9 -- scikit-learn>=0.24.2 -- sphinx>=3.2.1 -- tpot>=0.11.7 -- triad>=0.5.4 +- rust>=1.60.0 +- scikit-learn>=1.0.0 +- setuptools-rust>=1.1.2 +- sphinx +- tpot - tzlocal>=2.1 - uvicorn>=0.11.3 -- maturin>=0.12.8 -- setuptools-rust>=1.1.2 -- rust>=1.60.0 +# fugue dependencies; remove when we conda install fugue +- adagio +- antlr4-python3-runtime<4.10 +- ciso8601 +- fs +- pip +- qpd +- triad - pip: - fugue[sql]>=0.5.3 diff --git a/continuous_integration/gpuci/axis.yaml b/continuous_integration/gpuci/axis.yaml index 41ddb56ec..a4773b2e3 100644 --- a/continuous_integration/gpuci/axis.yaml +++ b/continuous_integration/gpuci/axis.yaml @@ -8,6 +8,6 @@ LINUX_VER: - ubuntu18.04 RAPIDS_VER: -- "22.06" +- "22.08" excludes: diff --git a/continuous_integration/recipe/meta.yaml b/continuous_integration/recipe/meta.yaml index 331a8ca7e..6b79188e4 100644 --- a/continuous_integration/recipe/meta.yaml +++ b/continuous_integration/recipe/meta.yaml @@ -31,8 +31,8 @@ requirements: run: - python - dask >=2022.3.0 - - pandas >=1.0.0 - - fastapi >=0.61.1 + - pandas >=1.1.2 + - fastapi >=0.69.0 - uvicorn >=0.11.3 - tzlocal >=2.1 - prompt-toolkit diff --git a/dask_planner/src/expression.rs b/dask_planner/src/expression.rs index 7730a20a4..245cd71c6 100644 --- a/dask_planner/src/expression.rs +++ b/dask_planner/src/expression.rs @@ -82,7 +82,12 @@ impl PyExpr { Expr::AggregateUDF { .. } => RexType::Call, Expr::InList { .. } => RexType::Call, Expr::Wildcard => RexType::Call, - _ => RexType::Other, + Expr::ScalarUDF { .. } => RexType::Call, + Expr::Exists { .. } => RexType::Call, + Expr::InSubquery { .. } => RexType::Call, + Expr::ScalarSubquery(..) => RexType::SubqueryAlias, + Expr::QualifiedWildcard { .. } => RexType::Reference, + Expr::GroupingSet(..) => RexType::Call, } } } @@ -94,6 +99,20 @@ impl PyExpr { PyExpr::from(lit(value.scalar_value), None) } + /// Extracts the LogicalPlan from a Subquery, or supported Subquery sub-type, from + /// the expression instance + #[pyo3(name = "getSubqueryLogicalPlan")] + pub fn subquery_plan(&self) -> PyResult { + match &self.expr { + Expr::ScalarSubquery(subquery) => Ok((&*subquery.subquery).clone().into()), + _ => Err(PyErr::new::(format!( + "Attempted to extract a LogicalPlan instance from invalid Expr {:?}. + Only Subquery and related variants are supported for this operation.", + &self.expr + ))), + } + } + /// If this Expression instances references an existing /// Column in the SQL parse tree or not #[pyo3(name = "isInputReference")] @@ -198,7 +217,12 @@ impl PyExpr { Expr::AggregateUDF { .. } => panic!("AggregateUDF!!!"), Expr::InList { .. } => panic!("InList!!!"), Expr::Wildcard => panic!("Wildcard!!!"), - _ => "OTHER", + Expr::InSubquery { .. } => "Subquery", + Expr::ScalarUDF { .. } => "ScalarUDF", + Expr::Exists { .. } => "Exists", + Expr::ScalarSubquery(..) => "ScalarSubquery", + Expr::QualifiedWildcard { .. } => "Wildcard", + Expr::GroupingSet(..) => "GroupingSet", }) } @@ -265,6 +289,17 @@ impl PyExpr { PyExpr::from(*low.clone(), self.input_plan.clone()), PyExpr::from(*high.clone(), self.input_plan.clone()), ]), + Expr::InSubquery { + expr: _, + subquery: _, + negated: _, + } => { + unimplemented!("InSubquery") + } + Expr::ScalarSubquery(subquery) => { + let _plan = &subquery.subquery; + unimplemented!("ScalarSubquery") + } Expr::IsNotNull(expr) => Ok(vec![PyExpr::from(*expr.clone(), self.input_plan.clone())]), _ => Err(PyErr::new::(format!( "unknown Expr type {:?} encountered", diff --git a/dask_planner/src/sql/types.rs b/dask_planner/src/sql/types.rs index 4ad226998..b89fa06a0 100644 --- a/dask_planner/src/sql/types.rs +++ b/dask_planner/src/sql/types.rs @@ -12,6 +12,7 @@ pub enum RexType { Literal, Call, Reference, + SubqueryAlias, Other, } diff --git a/dask_sql/context.py b/dask_sql/context.py index d8b79745a..26fd92158 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -139,6 +139,7 @@ def __init__(self, logging_level=logging.INFO): RexConverter.add_plugin_class(core.RexCallPlugin, replace=False) RexConverter.add_plugin_class(core.RexInputRefPlugin, replace=False) RexConverter.add_plugin_class(core.RexLiteralPlugin, replace=False) + RexConverter.add_plugin_class(core.RexSubqueryAliasPlugin, replace=False) InputUtil.add_plugin_class(input_utils.DaskInputPlugin, replace=False) InputUtil.add_plugin_class(input_utils.PandasLikeInputPlugin, replace=False) diff --git a/dask_sql/datacontainer.py b/dask_sql/datacontainer.py index 5229e4d88..97d204f64 100644 --- a/dask_sql/datacontainer.py +++ b/dask_sql/datacontainer.py @@ -152,7 +152,16 @@ def get_backend_by_frontend_name(self, column: str) -> str: Get back the dask column, which is referenced by the frontend (SQL) column with the given name. """ - backend_column = self._frontend_backend_mapping[column] + + # `UInt8(1)` is the Datafusion indicator for a `COUNT(*)`. Be SQL definition + # this means the user wants the `COUNT` for any/all columns. Therefore we + # simply retrieve the first column in the mapping since by definition # rows, + # which is `COUNT`, will always be the same for all columns. + if column == "UInt8(1)": + # Make sure to get the backend column name here by using [1] instead of [0]. [0] is frontend + backend_column = list(self._frontend_backend_mapping.items())[0][1] + else: + backend_column = self._frontend_backend_mapping[column] return backend_column def make_unique(self, prefix="col"): diff --git a/dask_sql/input_utils/dask.py b/dask_sql/input_utils/dask.py index 2da11e701..4dbeeef63 100644 --- a/dask_sql/input_utils/dask.py +++ b/dask_sql/input_utils/dask.py @@ -4,11 +4,6 @@ from dask_sql.input_utils.base import BaseInputPlugin -try: - import dask_cudf -except ImportError: - dask_cudf = None - class DaskInputPlugin(BaseInputPlugin): """Input Plugin for Dask DataFrames, just keeping them""" @@ -27,7 +22,9 @@ def to_dc( **kwargs ): if gpu: # pragma: no cover - if not dask_cudf: + try: + import dask_cudf + except ImportError: raise ModuleNotFoundError( "Setting `gpu=True` for table creation requires dask_cudf" ) diff --git a/dask_sql/input_utils/location.py b/dask_sql/input_utils/location.py index 8cb23a444..e1db51e6b 100644 --- a/dask_sql/input_utils/location.py +++ b/dask_sql/input_utils/location.py @@ -7,11 +7,6 @@ from dask_sql.input_utils.base import BaseInputPlugin from dask_sql.input_utils.convert import InputUtil -try: - import dask_cudf -except ImportError: - dask_cudf = None - class LocationInputPlugin(BaseInputPlugin): """Input Plugin for everything, which can be read in from a file (on disk, remote etc.)""" @@ -44,7 +39,9 @@ def to_dc( format = extension.lstrip(".") try: if gpu: # pragma: no cover - if not dask_cudf: + try: + import dask_cudf + except ImportError: raise ModuleNotFoundError( "Setting `gpu=True` for table creation requires dask-cudf" ) diff --git a/dask_sql/input_utils/pandaslike.py b/dask_sql/input_utils/pandaslike.py index 32d7ff5ea..7c3b66de0 100644 --- a/dask_sql/input_utils/pandaslike.py +++ b/dask_sql/input_utils/pandaslike.py @@ -3,11 +3,6 @@ from dask_sql.input_utils.base import BaseInputPlugin -try: - import cudf -except ImportError: - cudf = None - class PandasLikeInputPlugin(BaseInputPlugin): """Input Plugin for Pandas Like DataFrames, which get converted to dask DataFrames""" @@ -30,7 +25,9 @@ def to_dc( ): npartitions = kwargs.pop("npartitions", 1) if gpu: # pragma: no cover - if not cudf: + try: + import cudf + except ImportError: raise ModuleNotFoundError( "Setting `gpu=True` for table creation requires cudf" ) diff --git a/dask_sql/integrations/fugue.py b/dask_sql/integrations/fugue.py index c9e0a076f..ce685a1ee 100644 --- a/dask_sql/integrations/fugue.py +++ b/dask_sql/integrations/fugue.py @@ -73,15 +73,15 @@ def fsql_dask( register: bool = False, fugue_conf: Any = None, ) -> Dict[str, dd.DataFrame]: - """Fugue SQL utility function that can consume Context directly. Fugue SQL is a language + """FugueSQL utility function that can consume Context directly. FugueSQL is a language extending standard SQL. It makes SQL eligible to describe end to end workflows. It also enables you to invoke python extensions in the SQL like language. For more, please read - `Fugue SQl Tutorial `_ + `FugueSQL Tutorial `_ Args: - sql: (:obj:`str`): Fugue SQL statement + sql (:obj:`str`): Fugue SQL statement ctx (:class:`dask_sql.Context`): The context to operate on, defaults to None register (:obj:`bool`): Whether to register named steps back to the context (if provided), defaults to False @@ -89,26 +89,30 @@ def fsql_dask( Example: .. code-block:: python - # schema: * - def median(df:pd.DataFrame) -> pd.DataFrame: + + # define a custom prepartition function for FugueSQL + def median(df: pd.DataFrame) -> pd.DataFrame: df["y"] = df["y"].median() return df.head(1) - # Create a context with tables df1, df2 + # create a context with some tables c = Context() ... - result = fsql_dask(''' - j = SELECT df1.*, df2.x - FROM df1 INNER JOIN df2 ON df1.key = df2.key - PERSIST # using persist because j will be used twice - TAKE 5 ROWS PREPARTITION BY x PRESORT key - PRINT - TRANSFORM j PREPARTITION BY x USING median - PRINT - ''', c, register=True) + + # run a FugueSQL query using the context as input + query = ''' + j = SELECT df1.*, df2.x + FROM df1 INNER JOIN df2 ON df1.key = df2.key + PERSIST + TAKE 5 ROWS PREPARTITION BY x PRESORT key + PRINT + TRANSFORM j PREPARTITION BY x USING median + PRINT + ''' + result = fsql_dask(query, c, register=True) + assert "j" in result assert "j" in c.tables - """ _global, _local = get_caller_global_local_vars() diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 78c7c9cfa..6e42c6d11 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -84,14 +84,11 @@ def get_supported_aggregation(self, series): if pd.api.types.is_string_dtype(series.dtype): # If dask_cudf strings dtype, return built-in aggregation - if dask_cudf is not None and isinstance(series, dask_cudf.Series): + if "cudf" in str(series._partition_type): return built_in_aggregation - # With pandas StringDtype built-in aggregations work - # while with pandas ObjectDtype and Nulls built-in aggregations fail - if isinstance(series, dd.Series) and isinstance( - series.dtype, pd.StringDtype - ): + # with pandas StringDtype built-in aggregations work + if isinstance(series.dtype, pd.StringDtype): return built_in_aggregation return self.custom_aggregation @@ -399,6 +396,7 @@ def _perform_aggregation( # format aggregations for Dask; also check if we can use fast path for # groupby, which is only supported if we are not using any custom aggregations + # and our pandas version support dropna for groupbys aggregations_dict = defaultdict(dict) fast_groupby = True for aggregation in aggregations: diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index d868e7491..178121fef 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -65,6 +65,5 @@ def convert( df_condition = RexConverter.convert(rel, condition, dc, context=context) df = filter_or_scalar(df, df_condition) - # cc = self.fix_column_to_row_type(cc, rel.getRowType()) - # No column type has changed, so no need to convert again + cc = self.fix_column_to_row_type(cc, rel.getRowType()) return DataContainer(df, cc) diff --git a/dask_sql/physical/rex/convert.py b/dask_sql/physical/rex/convert.py index 35bc50ec1..b4ec08608 100644 --- a/dask_sql/physical/rex/convert.py +++ b/dask_sql/physical/rex/convert.py @@ -17,6 +17,7 @@ "RexType.Reference": "InputRef", "RexType.Call": "RexCall", "RexType.Literal": "RexLiteral", + "RexType.SubqueryAlias": "SubqueryAlias", } diff --git a/dask_sql/physical/rex/core/__init__.py b/dask_sql/physical/rex/core/__init__.py index 9c34373eb..f193f7faf 100644 --- a/dask_sql/physical/rex/core/__init__.py +++ b/dask_sql/physical/rex/core/__init__.py @@ -1,5 +1,6 @@ from .call import RexCallPlugin from .input_ref import RexInputRefPlugin from .literal import RexLiteralPlugin +from .subquery import RexSubqueryAliasPlugin -__all__ = [RexCallPlugin, RexInputRefPlugin, RexLiteralPlugin] +__all__ = [RexCallPlugin, RexInputRefPlugin, RexLiteralPlugin, RexSubqueryAliasPlugin] diff --git a/dask_sql/physical/rex/core/literal.py b/dask_sql/physical/rex/core/literal.py index 6f1844de9..99d884cb7 100644 --- a/dask_sql/physical/rex/core/literal.py +++ b/dask_sql/physical/rex/core/literal.py @@ -96,9 +96,7 @@ def convert( dc: DataContainer, context: "dask_sql.Context", ) -> Any: - logger.debug(f"Expression in literal.py: {rex}") literal_type = str(rex.getType()) - logger.debug(f"literal_type: {literal_type}") # Call the Rust function to get the actual value and convert the Rust # type name back to a SQL type @@ -147,9 +145,6 @@ def convert( else: raise RuntimeError("Failed to determine DataFusion Type in literal.py") - logger.debug(f"Expression in literal.py literal_value: {literal_value}") - logger.debug(f"Expression in literal.py literal_type: {literal_type}") - # if isinstance(literal_value, org.apache.calcite.util.Sarg): # return SargPythonImplementation(literal_value, literal_type) diff --git a/dask_sql/physical/rex/core/subquery.py b/dask_sql/physical/rex/core/subquery.py new file mode 100644 index 000000000..f5535af77 --- /dev/null +++ b/dask_sql/physical/rex/core/subquery.py @@ -0,0 +1,35 @@ +from typing import TYPE_CHECKING + +import dask.dataframe as dd + +from dask_sql.datacontainer import DataContainer +from dask_sql.physical.rel import RelConverter +from dask_sql.physical.rex.base import BaseRexPlugin + +if TYPE_CHECKING: + import dask_sql + from dask_planner.rust import Expression, LogicalPlan + + +class RexSubqueryAliasPlugin(BaseRexPlugin): + """ + A RexSubqueryAliasPlugin is an expression, which references a Subquery. + This plugin is thin on logic, however keeping with previous patterns + we use the plugin approach instead of placing the logic inline + """ + + class_name = "SubqueryAlias" + + def convert( + self, + rel: "LogicalPlan", + rex: "Expression", + dc: DataContainer, + context: "dask_sql.Context", + ) -> dd.DataFrame: + + # Extract the LogicalPlan from the Expr instance + sub_rel = rex.getSubqueryLogicalPlan() + + dc = RelConverter.convert(sub_rel, context=context) + return dc.df diff --git a/dask_sql/physical/utils/map.py b/dask_sql/physical/utils/map.py deleted file mode 100644 index 791342ccc..000000000 --- a/dask_sql/physical/utils/map.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import Any, Callable - -import dask -import dask.dataframe as dd - - -def map_on_partition_index( - df: dd.DataFrame, f: Callable, *args: Any, **kwargs: Any -) -> dd.DataFrame: - meta = kwargs.pop("meta", None) - return dd.from_delayed( - [ - dask.delayed(f)(partition, partition_number, *args, **kwargs) - for partition_number, partition in enumerate(df.partitions) - ], - meta=meta, - ) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 0e4cc9d85..c2ccce3c1 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -6,11 +6,6 @@ from dask_sql.utils import make_pickable_without_dask_sql -try: - import dask_cudf -except ImportError: - dask_cudf = None - def apply_sort( df: dd.DataFrame, @@ -35,10 +30,7 @@ def apply_sort( # dask / dask-cudf don't support lists of ascending / null positions if len(sort_columns) == 1 or ( - dask_cudf is not None - and isinstance(df, dask_cudf.DataFrame) - and single_ascending - and single_null_first + "cudf" in str(df._partition_type) and single_ascending and single_null_first ): try: return df.sort_values( diff --git a/docker/conda.txt b/docker/conda.txt index ddcac2de8..f295c69e5 100644 --- a/docker/conda.txt +++ b/docker/conda.txt @@ -1,6 +1,6 @@ python>=3.8 dask>=2022.3.0 -pandas>=1.0.0 # below 1.0, there were no nullable ext. types +pandas>=1.1.2 jpype1>=1.0.2 openjdk>=8 maven>=3.6.0 @@ -10,13 +10,14 @@ pytest-xdist mock>=4.0.3 sphinx>=3.2.1 tzlocal>=2.1 -fastapi>=0.61.1 +fastapi>=0.69.0 nest-asyncio>=1.4.3 uvicorn>=0.11.3 +pyarrow>=3.0.0 prompt_toolkit>=3.0.8 pygments>=2.7.1 dask-ml>=2022.1.22 -scikit-learn>=0.24.2 +scikit-learn>=1.0.0 intake>=0.6.0 pre-commit>=2.11.1 black=22.3.0 diff --git a/docker/main.dockerfile b/docker/main.dockerfile index 6f5a54b8e..4f275c1bf 100644 --- a/docker/main.dockerfile +++ b/docker/main.dockerfile @@ -7,17 +7,15 @@ LABEL author "Nils Braun " COPY docker/conda.txt /opt/dask_sql/ RUN conda config --add channels conda-forge \ && /opt/conda/bin/conda install --freeze-installed \ - "jpype1>=1.0.2" \ - "openjdk>=11" \ - "maven>=3.6.0" \ "tzlocal>=2.1" \ - "fastapi>=0.61.1" \ + "fastapi>=0.69.0" \ "uvicorn>=0.11.3" \ + "pyarrow>=3.0.0" \ "prompt_toolkit>=3.0.8" \ "pygments>=2.7.1" \ "dask-ml>=2022.1.22" \ "setuptools-rust>=1.1.2" \ - "scikit-learn>=0.24.2" \ + "scikit-learn>=1.0.0" \ "intake>=0.6.0" \ && conda clean -ay diff --git a/docs/environment.yml b/docs/environment.yml index d9fbfc635..8d8e44c95 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -8,10 +8,10 @@ dependencies: - sphinx-tabs - dask-sphinx-theme>=2.0.3 - dask>=2022.3.0 - - pandas>=1.0.0 + - pandas>=1.1.2 - fugue>=0.5.3 - jpype1>=1.0.2 - - fastapi>=0.61.1 + - fastapi>=0.69.0 - uvicorn>=0.11.3 - tzlocal>=2.1 - prompt_toolkit diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt index 6020d793d..5d2177e5c 100644 --- a/docs/requirements-docs.txt +++ b/docs/requirements-docs.txt @@ -1,10 +1,10 @@ sphinx>=4.0.0 sphinx-tabs -dask-sphinx-theme>=2.0.3 +dask-sphinx-theme>=3.0.0 dask>=2022.3.0 -pandas>=1.0.0 +pandas>=1.1.2 fugue>=0.5.3 -fastapi>=0.61.1 +fastapi>=0.69.0 uvicorn>=0.11.3 tzlocal>=2.1 prompt_toolkit diff --git a/docs/source/api.rst b/docs/source/api.rst index 29c4f5632..cb5407419 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -11,4 +11,7 @@ API Documentation .. autofunction:: dask_sql.cmd_loop -.. autofunction:: dask_sql.integrations.fugue.fsql +.. autoclass:: dask_sql.integrations.fugue.DaskSQLExecutionEngine + :members: + +.. autofunction:: dask_sql.integrations.fugue.fsql_dask diff --git a/docs/source/fugue.rst b/docs/source/fugue.rst new file mode 100644 index 000000000..264d19fcd --- /dev/null +++ b/docs/source/fugue.rst @@ -0,0 +1,44 @@ +FugueSQL Integrations +===================== + +`FugueSQL `_ is a related project that aims to provide a unified SQL interface for a variety of different computing frameworks, including Dask. +While it offers a SQL engine with a larger set of supported commands, this comes at the cost of slower performance when using Dask in comparison to dask-sql. +In order to offer a "best of both worlds" solution, dask-sql includes several options to integrate with FugueSQL, using its faster implementation of SQL commands when possible and falling back on FugueSQL when necessary. + +dask-sql as a FugueSQL engine +----------------------------- + +FugueSQL users unfamiliar with dask-sql can take advantage of its functionality with minimal code changes by passing :class:`dask_sql.integrations.fugue.DaskSQLExecutionEngine` into the ``FugueSQLWorkflow`` being used to execute commands. +For more information and sample usage, see `Fugue β€” dask-sql as a FugueSQL engine `_. + +Using FugueSQL on an existing ``Context`` +----------------------------------------- + +dask-sql users attempting to expand their SQL querying options for an existing ``Context`` can use :func:`dask_sql.integrations.fugue.fsql_dask`, which executes the provided query using FugueSQL, using the tables within the provided context as input. +The results of this query can then optionally be registered to the context: + +.. code-block:: python + + # define a custom prepartition function for FugueSQL + def median(df: pd.DataFrame) -> pd.DataFrame: + df["y"] = df["y"].median() + return df.head(1) + + # create a context with some tables + c = Context() + ... + + # run a FugueSQL query using the context as input + query = """ + j = SELECT df1.*, df2.x + FROM df1 INNER JOIN df2 ON df1.key = df2.key + PERSIST + TAKE 5 ROWS PREPARTITION BY x PRESORT key + PRINT + TRANSFORM j PREPARTITION BY x USING median + PRINT + """ + result = fsql_dask(query, c, register=True) # results aren't registered by default + + assert "j" in result # returns a dict of resulting tables + assert "j" in c.tables # results are also registered to the context diff --git a/docs/source/index.rst b/docs/source/index.rst index 8ebb80150..8a9accc99 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -98,6 +98,7 @@ For this example, we use some data loaded from disk and query it with a SQL comm api server cmd + fugue how_does_it_work configuration diff --git a/setup.py b/setup.py index abe657eb9..552506afb 100644 --- a/setup.py +++ b/setup.py @@ -43,8 +43,8 @@ setup_requires=sphinx_requirements, install_requires=[ "dask[dataframe,distributed]>=2022.3.0", - "pandas>=1.0.0", # below 1.0, there were no nullable ext. types - "fastapi>=0.61.1", + "pandas>=1.1.2", + "fastapi>=0.69.0", "uvicorn>=0.11.3", "tzlocal>=2.1", "prompt_toolkit", @@ -58,8 +58,9 @@ "pytest-cov>=2.10.1", "mock>=4.0.3", "sphinx>=3.2.1", + "pyarrow>=3.0.0", "dask-ml>=2022.1.22", - "scikit-learn>=0.24.2", + "scikit-learn>=1.0.0", "intake>=0.6.0", "pre-commit", "black==22.3.0", diff --git a/tests/integration/test_analyze.py b/tests/integration/test_analyze.py index 504e92428..f12fbba25 100644 --- a/tests/integration/test_analyze.py +++ b/tests/integration/test_analyze.py @@ -14,11 +14,12 @@ def test_analyze(c, df): 700.0, df.a.mean(), df.a.std(), - 1.0, + df.a.min(), + # Dask's approx quantiles do not match up with pandas and must be specified explicitly + 2.0, 2.0, - 2.0, # incorrect, but what Dask gives for approx quantile - 3.0, 3.0, + df.a.max(), "double", "a", ], @@ -27,9 +28,10 @@ def test_analyze(c, df): df.b.mean(), df.b.std(), df.b.min(), - df.b.quantile(0.25), - df.b.quantile(0.5), - df.b.quantile(0.75), + # Dask's approx quantiles do not match up with pandas and must be specified explicitly + 2.73108, + 5.20286, + 7.60595, df.b.max(), "double", "b", @@ -49,9 +51,8 @@ def test_analyze(c, df): ], ) - # The percentiles are calculated only approximately, therefore we do not use exact matching - assert_eq(result_df, expected_df, rtol=0.135) + assert_eq(result_df, expected_df) result_df = c.sql("ANALYZE TABLE df COMPUTE STATISTICS FOR COLUMNS a") - assert_eq(result_df, expected_df[["a"]], rtol=0.135) + assert_eq(result_df, expected_df[["a"]]) diff --git a/tests/integration/test_compatibility.py b/tests/integration/test_compatibility.py index 8277baad5..861e9fa92 100644 --- a/tests/integration/test_compatibility.py +++ b/tests/integration/test_compatibility.py @@ -23,8 +23,11 @@ def cast_datetime_to_string(df): cols = df.select_dtypes(include=["datetime64[ns]"]).columns.tolist() - # Casting to object first as - # directly converting to string looses second precision + + if not cols: + return df + + # Casting directly to string loses second precision df[cols] = df[cols].astype("object").astype("string") return df diff --git a/tests/integration/test_create.py b/tests/integration/test_create.py index 7a585a5e5..1e5281e79 100644 --- a/tests/integration/test_create.py +++ b/tests/integration/test_create.py @@ -371,3 +371,35 @@ def test_drop(c): with pytest.raises(dask_sql.utils.ParsingException): c.sql("SELECT a FROM new_table") + + +@pytest.mark.skip(reason="WIP DataFusion") +def test_create_gpu_error(c, df, temporary_data_file): + try: + import cudf + except ImportError: + cudf = None + + if cudf is not None: + pytest.skip("GPU-related import errors only need to be checked on CPU") + + with pytest.raises(ModuleNotFoundError): + c.create_table("new_table", df, gpu=True) + + with pytest.raises(ModuleNotFoundError): + c.create_table("new_table", dd.from_pandas(df, npartitions=2), gpu=True) + + df.to_csv(temporary_data_file, index=False) + + with pytest.raises(ModuleNotFoundError): + c.sql( + f""" + CREATE TABLE + new_table + WITH ( + location = '{temporary_data_file}', + format = 'csv', + gpu = True + ) + """ + ) diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index fffb3a38a..f7c8c5502 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -135,9 +135,6 @@ def test_filter_year(c): assert_eq(expected_df, return_df) -@pytest.mark.skip( - reason="WIP DataFusion - https://github.com/dask-contrib/dask-sql/issues/538" -) @pytest.mark.parametrize( "query,df_func,filters", [ @@ -156,11 +153,12 @@ def test_filter_year(c): lambda x: x[((x["b"] > 5) & (x["b"] < 10)) | (x["a"] == 1)], [[("a", "==", 1)], [("b", "<", 10), ("b", ">", 5)]], ), - ( - "SELECT * FROM parquet_ddf WHERE b IN (1, 6)", - lambda x: x[(x["b"] == 1) | (x["b"] == 6)], - [[("b", "<=", 1), ("b", ">=", 1)], [("b", "<=", 6), ("b", ">=", 6)]], - ), + # https://github.com/dask-contrib/dask-sql/issues/531 + # ( + # "SELECT * FROM parquet_ddf WHERE b IN (1, 6)", + # lambda x: x[(x["b"] == 1) | (x["b"] == 6)], + # [[("b", "<=", 1), ("b", ">=", 1)], [("b", "<=", 6), ("b", ">=", 6)]], + # ), ( "SELECT a FROM parquet_ddf WHERE (b > 5 AND b < 10) OR a = 1", lambda x: x[((x["b"] > 5) & (x["b"] < 10)) | (x["a"] == 1)][["a"]], @@ -177,14 +175,15 @@ def test_filter_year(c): [("a", "==", 1), ("c", "==", "A")], ], ), - ( - # The predicate-pushdown optimization will be skipped here, - # because datetime accessors are not supported. However, - # the query should still succeed. - "SELECT * FROM parquet_ddf WHERE year(d) < 2015", - lambda x: x[x["d"].dt.year < 2015], - None, - ), + # https://github.com/dask-contrib/dask-sql/issues/538 + # ( + # # The predicate-pushdown optimization will be skipped here, + # # because datetime accessors are not supported. However, + # # the query should still succeed. + # "SELECT * FROM parquet_ddf WHERE year(d) < 2015", + # lambda x: x[x["d"].dt.year < 2015], + # None, + # ), ], ) def test_predicate_pushdown(c, parquet_ddf, query, df_func, filters): diff --git a/tests/integration/test_groupby.py b/tests/integration/test_groupby.py index 818bf3533..02df11749 100644 --- a/tests/integration/test_groupby.py +++ b/tests/integration/test_groupby.py @@ -157,13 +157,8 @@ def test_group_by_nan(c): ) expected_df = pd.DataFrame({"c": [3, float("nan"), 1]}) - # The dtype in pandas 1.0.5 and pandas 1.1.0 are different, so - # we cannot check here - assert_eq( - return_df.sort_values("c").reset_index(drop=True), - expected_df.sort_values("c").reset_index(drop=True), - check_dtype=False, - ) + # we return nullable int dtype instead of float + assert_eq(return_df, expected_df, check_dtype=False) return_df = c.sql( """ diff --git a/tests/integration/test_jdbc.py b/tests/integration/test_jdbc.py index 6f9d6a5f9..7997302f1 100644 --- a/tests/integration/test_jdbc.py +++ b/tests/integration/test_jdbc.py @@ -103,89 +103,17 @@ def test_jdbc_has_columns(app_client, c): data=f"SELECT * from system.jdbc.columns where TABLE_NAME = '{table}'", ) assert response.status_code == 200 - result = get_result_or_error(app_client, response) + client_result = get_result_or_error(app_client, response) - assert_result(result, 24, 3) - assert result["data"] == [ - [ - "", - "a_schema", - "a_table", - "A_STR", - "VARCHAR", - "VARCHAR", - "", - "", - "", - "", - "", - "", - "", - "VARCHAR", - "", - "", - "1", - "", - "", - "", - "", - "", - "", - "", - ], - [ - "", - "a_schema", - "a_table", - "AN_INT", - "INTEGER", - "INTEGER", - "", - "", - "", - "", - "", - "", - "", - "INTEGER", - "", - "", - "2", - "", - "", - "", - "", - "", - "", - "", - ], - [ - "", - "a_schema", - "a_table", - "A_FLOAT", - "FLOAT", - "FLOAT", - "", - "", - "", - "", - "", - "", - "", - "FLOAT", - "", - "", - "3", - "", - "", - "", - "", - "", - "", - "", - ], - ] + # ordering of rows isn't consistent between fastapi versions + context_result = ( + c.sql("SELECT * FROM system_jdbc.columns WHERE TABLE_NAME = 'a_table'") + .compute() + .values.tolist() + ) + + assert_result(client_result, 24, 3) + assert client_result["data"] == context_result def assert_result(result, col_len, data_len): diff --git a/tests/integration/test_join.py b/tests/integration/test_join.py index ef26ceb98..00617ca8d 100644 --- a/tests/integration/test_join.py +++ b/tests/integration/test_join.py @@ -315,3 +315,41 @@ def test_conditional_join_with_limit(c): ) dd.assert_eq(actual_df, expected_df, check_index=False) + + +def test_intersect(c): + + # Join df_simple against itself + actual_df = c.sql( + """ + select count(*) from ( + select * from df_simple + intersect + select * from df_simple + ) hot_item + limit 100 + """ + ) + assert actual_df["COUNT(UInt8(1))"].compute()[0] == 3 + + # Join df_simple against itself, and then that result against df_wide. Nothing should match so therefore result should be 0 + actual_df = c.sql( + """ + select count(*) from ( + select * from df_simple + intersect + select * from df_simple + intersect + select * from df_wide + ) hot_item + limit 100 + """ + ) + assert len(actual_df["COUNT(UInt8(1))"]) == 0 + + actual_df = c.sql( + """ + select * from df_simple intersect select * from df_simple + """ + ) + assert actual_df.shape[0].compute() == 3 diff --git a/tests/integration/test_select.py b/tests/integration/test_select.py index 81069960a..28b3c0028 100644 --- a/tests/integration/test_select.py +++ b/tests/integration/test_select.py @@ -203,9 +203,10 @@ def test_multi_case_when(c): FROM df """ ) - expected_df = pd.DataFrame({"C": [0, 1, 1, 1, 0]}, dtype=np.int64) + expected_df = pd.DataFrame({"C": [0, 1, 1, 1, 0]}) - assert_eq(actual_df, expected_df) + # dtype varies between int32/int64 depending on pandas version + assert_eq(actual_df, expected_df, check_dtype=False) def test_case_when_no_else(c): @@ -221,7 +222,8 @@ def test_case_when_no_else(c): ) expected_df = pd.DataFrame({"C": [None, 1, 1, 1, None]}) - assert_eq(actual_df, expected_df) + # dtype varies between float64/object depending on pandas version + assert_eq(actual_df, expected_df, check_dtype=False) def test_singular_column_projection_simple(c):