diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 5be6a930b..e1f53c6db 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -5,9 +5,9 @@ import numpy as np import pandas as pd import pytest -from dask.datasets import timeseries from dask.distributed import Client -from pandas.testing import assert_frame_equal + +from tests.utils import assert_eq try: import cudf @@ -23,18 +23,6 @@ SCHEDULER_ADDR = os.getenv("DASK_SQL_TEST_SCHEDULER", None) -@pytest.fixture() -def timeseries_df(c): - pdf = timeseries(freq="1d").compute().reset_index(drop=True) - # impute nans in pandas dataframe - col1_index = np.random.randint(0, 30, size=int(pdf.shape[0] * 0.2)) - col2_index = np.random.randint(0, 30, size=int(pdf.shape[0] * 0.3)) - pdf.loc[col1_index, "x"] = np.nan - pdf.loc[col2_index, "y"] = np.nan - c.create_table("timeseries", pdf, persist=True) - return pdf - - @pytest.fixture() def df_simple(): return pd.DataFrame({"a": [1, 2, 3], "b": [1.1, 2.2, 3.3]}) @@ -311,7 +299,7 @@ def _assert_query_gives_same_result(query, sort_columns=None, **kwargs): sql_result = sql_result.reset_index(drop=True) dask_result = dask_result.reset_index(drop=True) - assert_frame_equal(sql_result, dask_result, check_dtype=False, **kwargs) + assert_eq(sql_result, dask_result, check_dtype=False, **kwargs) return _assert_query_gives_same_result diff --git a/tests/integration/test_analyze.py b/tests/integration/test_analyze.py index cb3b70d2f..cd51ce1b8 100644 --- a/tests/integration/test_analyze.py +++ b/tests/integration/test_analyze.py @@ -1,11 +1,10 @@ -import numpy as np import pandas as pd -from pandas.testing import assert_frame_equal + +from tests.utils import assert_eq def test_analyze(c, df): result_df = c.sql("ANALYZE TABLE df COMPUTE STATISTICS FOR ALL COLUMNS") - result_df = result_df.compute() expected_df = pd.DataFrame( { @@ -15,8 +14,7 @@ def test_analyze(c, df): df.a.std(), 1.0, 2.0, - # That is actually wrong. But the approximate quantile function in dask gives a different result than the actual computation - result_df["a"].iloc[5], + 2.0, # incorrect, but what Dask gives for approx quantile 3.0, 3.0, "double", @@ -50,12 +48,8 @@ def test_analyze(c, df): ) # The percentiles are calculated only approximately, therefore we do not use exact matching - p = ["25%", "50%", "75%"] - result_df.loc[p, :] = result_df.loc[p, :].astype(float).apply(np.ceil) - expected_df.loc[p, :] = expected_df.loc[p, :].astype(float).apply(np.ceil) - assert_frame_equal(result_df, expected_df, check_exact=False) + assert_eq(result_df, expected_df, rtol=0.135) result_df = c.sql("ANALYZE TABLE df COMPUTE STATISTICS FOR COLUMNS a") - result_df = result_df.compute() - assert_frame_equal(result_df, expected_df[["a"]]) + assert_eq(result_df, expected_df[["a"]], rtol=0.135) diff --git a/tests/integration/test_compatibility.py b/tests/integration/test_compatibility.py index e087460df..63c1668b2 100644 --- a/tests/integration/test_compatibility.py +++ b/tests/integration/test_compatibility.py @@ -14,14 +14,14 @@ import numpy as np import pandas as pd -from pandas.testing import assert_frame_equal from dask_sql import Context from dask_sql.utils import ParsingException +from tests.utils import assert_eq def cast_datetime_to_string(df): - cols = df.select_dtypes(include=["datetime64[ns]"]).columns + cols = df.select_dtypes(include=["datetime64[ns]"]).columns.tolist() # Casting to object first as # directly converting to string looses second precision df[cols] = df[cols].astype("object").astype("string") @@ -36,7 +36,7 @@ def eq_sqlite(sql, **dfs): c.create_table(name, df) df.to_sql(name, engine, index=False) - dask_result = c.sql(sql).compute().reset_index(drop=True) + dask_result = c.sql(sql).reset_index(drop=True) sqlite_result = pd.read_sql(sql, engine).reset_index(drop=True) # casting to object to ensure equality with sql-lite @@ -47,7 +47,7 @@ def eq_sqlite(sql, **dfs): dask_result = dask_result.fillna(np.NaN) sqlite_result = sqlite_result.fillna(np.NaN) - assert_frame_equal(dask_result, sqlite_result, check_dtype=False) + assert_eq(dask_result, sqlite_result, check_dtype=False) def make_rand_df(size: int, **kwargs): diff --git a/tests/integration/test_complex.py b/tests/integration/test_complex.py index 0ebdd1a76..fc79f0a11 100644 --- a/tests/integration/test_complex.py +++ b/tests/integration/test_complex.py @@ -28,9 +28,6 @@ def test_complex_query(c): lhs.name = rhs.max_name AND lhs.x = rhs.max_x """ - ) + ).compute() - # should not fail - df = result.compute() - - assert len(df) > 0 + assert len(result) > 0 diff --git a/tests/integration/test_create.py b/tests/integration/test_create.py index cac4a3099..456435b7e 100644 --- a/tests/integration/test_create.py +++ b/tests/integration/test_create.py @@ -1,9 +1,9 @@ import dask.dataframe as dd import pandas as pd import pytest -from pandas.testing import assert_frame_equal import dask_sql +from tests.utils import assert_eq @pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) @@ -26,12 +26,9 @@ def test_create_from_csv(c, df, temporary_data_file, gpu): """ SELECT * FROM new_table """ - ).compute() - - if gpu: - result_df = result_df.to_pandas() + ) - assert_frame_equal(result_df, df) + assert_eq(result_df, df) @pytest.mark.parametrize( @@ -60,12 +57,9 @@ def test_cluster_memory(client, c, df, gpu): """ SELECT * FROM new_table """ - ).compute() - - if gpu: - return_df = return_df.to_pandas() + ) - assert_frame_equal(df, return_df) + assert_eq(df, return_df) client.unpublish_dataset("df") @@ -91,12 +85,9 @@ def test_create_from_csv_persist(c, df, temporary_data_file, gpu): """ SELECT * FROM new_table """ - ).compute() - - if gpu: - return_df = return_df.to_pandas() + ) - assert_frame_equal(df, return_df) + assert_eq(df, return_df) def test_wrong_create(c): @@ -139,9 +130,9 @@ def test_create_from_query(c, df): """ SELECT * FROM new_table """ - ).compute() + ) - assert_frame_equal(df, return_df) + assert_eq(df, return_df) c.sql( """ @@ -157,9 +148,9 @@ def test_create_from_query(c, df): """ SELECT * FROM new_table """ - ).compute() + ) - assert_frame_equal(df, return_df) + assert_eq(df, return_df) @pytest.mark.parametrize( @@ -210,27 +201,19 @@ def test_view_table_persist(c, temporary_data_file, df, gpu): """ ) - from_view = c.sql("SELECT c FROM count_view").compute() - from_table = c.sql("SELECT c FROM count_table").compute() - - if gpu: - from_view = from_view.to_pandas() - from_table = from_table.to_pandas() + from_view = c.sql("SELECT c FROM count_view") + from_table = c.sql("SELECT c FROM count_table") - assert_frame_equal(from_view, pd.DataFrame({"c": [700]})) - assert_frame_equal(from_table, pd.DataFrame({"c": [700]})) + assert_eq(from_view, pd.DataFrame({"c": [700]})) + assert_eq(from_table, pd.DataFrame({"c": [700]})) df.iloc[:10].to_csv(temporary_data_file, index=False) - from_view = c.sql("SELECT c FROM count_view").compute() - from_table = c.sql("SELECT c FROM count_table").compute() + from_view = c.sql("SELECT c FROM count_view") + from_table = c.sql("SELECT c FROM count_table") - if gpu: - from_view = from_view.to_pandas() - from_table = from_table.to_pandas() - - assert_frame_equal(from_view, pd.DataFrame({"c": [10]})) - assert_frame_equal(from_table, pd.DataFrame({"c": [700]})) + assert_eq(from_view, pd.DataFrame({"c": [10]})) + assert_eq(from_table, pd.DataFrame({"c": [700]})) def test_replace_and_error(c, temporary_data_file, df): @@ -244,8 +227,8 @@ def test_replace_and_error(c, temporary_data_file, df): """ ) - assert_frame_equal( - c.sql("SELECT a FROM new_table").compute(), + assert_eq( + c.sql("SELECT a FROM new_table"), pd.DataFrame({"a": [1]}), check_dtype=False, ) @@ -271,8 +254,8 @@ def test_replace_and_error(c, temporary_data_file, df): """ ) - assert_frame_equal( - c.sql("SELECT a FROM new_table").compute(), + assert_eq( + c.sql("SELECT a FROM new_table"), pd.DataFrame({"a": [1]}), check_dtype=False, ) @@ -287,8 +270,8 @@ def test_replace_and_error(c, temporary_data_file, df): """ ) - assert_frame_equal( - c.sql("SELECT a FROM new_table").compute(), + assert_eq( + c.sql("SELECT a FROM new_table"), pd.DataFrame({"a": [2]}), check_dtype=False, ) @@ -308,8 +291,8 @@ def test_replace_and_error(c, temporary_data_file, df): """ ) - assert_frame_equal( - c.sql("SELECT a FROM new_table").compute(), + assert_eq( + c.sql("SELECT a FROM new_table"), pd.DataFrame({"a": [3]}), check_dtype=False, ) @@ -338,8 +321,8 @@ def test_replace_and_error(c, temporary_data_file, df): """ ) - assert_frame_equal( - c.sql("SELECT a FROM new_table").compute(), + assert_eq( + c.sql("SELECT a FROM new_table"), pd.DataFrame({"a": [3]}), check_dtype=False, ) @@ -355,13 +338,9 @@ def test_replace_and_error(c, temporary_data_file, df): """ ) - result_df = c.sql( - """ - SELECT * FROM new_table - """ - ).compute() + result_df = c.sql("SELECT * FROM new_table") - assert_frame_equal(result_df, df) + assert_eq(result_df, df) def test_drop(c): diff --git a/tests/integration/test_distributeby.py b/tests/integration/test_distributeby.py index 93d3b3426..c865185ce 100644 --- a/tests/integration/test_distributeby.py +++ b/tests/integration/test_distributeby.py @@ -2,24 +2,13 @@ import pandas as pd import pytest -try: - import cudf -except ImportError: - cudf = None - @pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) def test_distribute_by(c, gpu): - - if gpu: - xd = cudf - else: - xd = pd - - df = xd.DataFrame({"id": [0, 1, 2, 1, 2, 3], "val": [0, 1, 2, 1, 2, 3]}) + df = pd.DataFrame({"id": [0, 1, 2, 1, 2, 3], "val": [0, 1, 2, 1, 2, 3]}) ddf = dd.from_pandas(df, npartitions=2) - c.create_table("test", ddf) + c.create_table("test", ddf, gpu=gpu) partitioned_ddf = c.sql( """ SELECT diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index a5231a5e2..192880267 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -2,51 +2,45 @@ import pandas as pd import pytest from dask.utils_test import hlg_layer -from pandas.testing import assert_frame_equal from dask_sql._compat import INT_NAN_IMPLEMENTED +from tests.utils import assert_eq def test_filter(c, df): return_df = c.sql("SELECT * FROM df WHERE a < 2") - return_df = return_df.compute() expected_df = df[df["a"] < 2] - assert_frame_equal(return_df, expected_df) + assert_eq(return_df, expected_df) def test_filter_scalar(c, df): return_df = c.sql("SELECT * FROM df WHERE True") - return_df = return_df.compute() expected_df = df - assert_frame_equal(return_df, expected_df) + assert_eq(return_df, expected_df) return_df = c.sql("SELECT * FROM df WHERE False") - return_df = return_df.compute() expected_df = df.head(0) - assert_frame_equal(return_df, expected_df, check_index_type=False) + assert_eq(return_df, expected_df, check_index_type=False) return_df = c.sql("SELECT * FROM df WHERE (1 = 1)") - return_df = return_df.compute() expected_df = df - assert_frame_equal(return_df, expected_df) + assert_eq(return_df, expected_df) return_df = c.sql("SELECT * FROM df WHERE (1 = 0)") - return_df = return_df.compute() expected_df = df.head(0) - assert_frame_equal(return_df, expected_df, check_index_type=False) + assert_eq(return_df, expected_df, check_index_type=False) def test_filter_complicated(c, df): return_df = c.sql("SELECT * FROM df WHERE a < 3 AND (b > 1 AND b < 3)") - return_df = return_df.compute() expected_df = df[((df["a"] < 3) & ((df["b"] > 1) & (df["b"] < 3)))] - assert_frame_equal( + assert_eq( return_df, expected_df, ) @@ -54,13 +48,12 @@ def test_filter_complicated(c, df): def test_filter_with_nan(c): return_df = c.sql("SELECT * FROM user_table_nan WHERE c = 3") - return_df = return_df.compute() if INT_NAN_IMPLEMENTED: expected_df = pd.DataFrame({"c": [3]}, dtype="Int8") else: expected_df = pd.DataFrame({"c": [3]}, dtype="float") - assert_frame_equal( + assert_eq( return_df, expected_df, ) @@ -68,9 +61,8 @@ def test_filter_with_nan(c): def test_string_filter(c, string_table): return_df = c.sql("SELECT * FROM string_table WHERE a = 'a normal string'") - return_df = return_df.compute() - assert_frame_equal( + assert_eq( return_df, string_table.head(1), ) @@ -96,7 +88,7 @@ def test_filter_cast_date(c, input_table, request): datetime_table["timezone"].astype(" pd.Timestamp("2014-08-01") ] - dd.assert_eq(return_df, expected_df) + assert_eq(return_df, expected_df) @pytest.mark.parametrize( @@ -119,19 +111,19 @@ def test_filter_cast_timestamp(c, input_table, request): datetime_table["timezone"].astype("= pd.Timestamp("2014-08-01 23:00:00") ] - dd.assert_eq(return_df, expected_df) + assert_eq(return_df, expected_df) def test_filter_year(c): df = pd.DataFrame({"year": [2015, 2016], "month": [2, 3], "day": [4, 5]}) - df["dt"] = pd.to_datetime(df) c.create_table("datetime_test", df) - actual_df = c.sql("select * from datetime_test where year(dt) < 2016").compute() + + return_df = c.sql("select * from datetime_test where year(dt) < 2016") expected_df = df[df["year"] < 2016] - assert_frame_equal(expected_df, actual_df) + assert_eq(expected_df, return_df) @pytest.mark.parametrize( @@ -199,9 +191,11 @@ def test_predicate_pushdown(c, parquet_ddf, query, df_func, filters): assert got_filters == expect_filters # Check computed result is correct - df = parquet_ddf.compute() + df = parquet_ddf expected_df = df_func(df) - dd.assert_eq(return_df, expected_df) + + # TODO: divisions should be consistent when successfully doing predicate pushdown + assert_eq(return_df, expected_df, check_divisions=False) def test_filtered_csv(tmpdir, c): @@ -229,6 +223,7 @@ def test_filtered_csv(tmpdir, c): c.drop_table("my_csv_table") # Check computed result is correct - df = csv_ddf.compute() + df = csv_ddf expected_df = df[df["b"] < 10] - dd.assert_eq(return_df, expected_df) + + assert_eq(return_df, expected_df) diff --git a/tests/integration/test_fugue.py b/tests/integration/test_fugue.py index ba3acd00c..951bf7a48 100644 --- a/tests/integration/test_fugue.py +++ b/tests/integration/test_fugue.py @@ -1,10 +1,10 @@ import dask.dataframe as dd import pandas as pd import pytest -from pandas.testing import assert_frame_equal from dask_sql import Context from tests.integration.fixtures import skip_if_external_scheduler +from tests.utils import assert_eq fugue_sql = pytest.importorskip("fugue_sql") @@ -14,44 +14,38 @@ def test_simple_statement(): - dag = fugue_sql.FugueSQLWorkflow() - df = dag.df([[0, "hello"], [1, "world"]], "a:int64,b:str") - dag( - """ - SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result - """ - ) - result = dag.run(DaskSQLExecutionEngine) + with fugue_sql.FugueSQLWorkflow(DaskSQLExecutionEngine) as dag: + df = dag.df([[0, "hello"], [1, "world"]], "a:int64,b:str") + dag("SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result") + result = dag.run() return_df = result["result"].as_pandas() - assert_frame_equal(return_df, pd.DataFrame({"a": [1], "b": ["world"]})) + assert_eq(return_df, pd.DataFrame({"a": [1], "b": ["world"]})) # A more elegant way to do things pdf = pd.DataFrame([[0, "hello"], [1, "world"]], columns=["a", "b"]) result = fugue_sql.fsql( - """ - SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result - """, + "SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result", df=pdf, ).run("dask") return_df = result["result"].as_pandas() - assert_frame_equal(return_df, pd.DataFrame({"a": [1], "b": ["world"]})) + assert_eq(return_df, pd.DataFrame({"a": [1], "b": ["world"]})) # TODO: Revisit fixing this on an independant cluster (without dask-sql) based on the # discussion in https://github.com/dask-contrib/dask-sql/issues/407 @skip_if_external_scheduler def test_fsql(): - def assert_eq(df: pd.DataFrame) -> None: - assert_frame_equal(df, pd.DataFrame({"a": [1]})) + def assert_fsql(df: pd.DataFrame) -> None: + assert_eq(df, pd.DataFrame({"a": [1]})) # the simplest case: the SQL does not use any input and does not generate output fsql_dask( """ CREATE [[0],[1]] SCHEMA a:long SELECT * WHERE a>0 - OUTPUT USING assert_eq + OUTPUT USING assert_fsql """ ) @@ -64,7 +58,7 @@ def assert_eq(df: pd.DataFrame) -> None: fsql_dask( """ SELECT * FROM df WHERE a>0 - OUTPUT USING assert_eq + OUTPUT USING assert_fsql """, c, ) @@ -74,7 +68,7 @@ def assert_eq(df: pd.DataFrame) -> None: result = fsql_dask( """ x=SELECT * FROM df WHERE a>0 - OUTPUT USING assert_eq + OUTPUT USING assert_fsql """, c, register=True, diff --git a/tests/integration/test_function.py b/tests/integration/test_function.py index c6342877c..92fc58b14 100644 --- a/tests/integration/test_function.py +++ b/tests/integration/test_function.py @@ -4,7 +4,8 @@ import dask.dataframe as dd import numpy as np import pytest -from pandas.testing import assert_frame_equal + +from tests.utils import assert_eq def test_custom_function(c, df): @@ -13,15 +14,9 @@ def f(x): c.register_function(f, "f", [("x", np.float64)], np.float64) - return_df = c.sql( - """ - SELECT F(a) AS a - FROM df - """ - ) - return_df = return_df.compute() + return_df = c.sql("SELECT F(a) AS a FROM df") - assert_frame_equal(return_df.reset_index(drop=True), df[["a"]] ** 2) + assert_eq(return_df, df[["a"]] ** 2) def test_custom_function_row(c, df): @@ -30,15 +25,9 @@ def f(row): c.register_function(f, "f", [("x", np.float64)], np.float64, row_udf=True) - return_df = c.sql( - """ - SELECT F(a) AS a - FROM df - """ - ) - return_df = return_df.compute() + return_df = c.sql("SELECT F(a) AS a FROM df") - assert_frame_equal(return_df.reset_index(drop=True), df[["a"]] ** 2) + assert_eq(return_df, df[["a"]] ** 2) @pytest.mark.parametrize("colnames", list(itertools.combinations(["a", "b", "c"], 2))) @@ -58,7 +47,7 @@ def f(row): expect = df_wide[colname_x] + df_wide[colname_y] got = return_df.iloc[:, 0] - dd.assert_eq(expect, got, check_names=False) + assert_eq(expect, got, check_names=False) @pytest.mark.parametrize( @@ -75,15 +64,10 @@ def f(row): return c.register_function(f, "f", [("x", np.float64)], retty, row_udf=True) - return_df = c.sql( - """ - SELECT F(a) AS a - FROM df - """ - ) - return_df = return_df.compute() - expectation = (df[["a"]] ** 2).astype(retty) - assert_frame_equal(return_df.reset_index(drop=True), expectation) + + return_df = c.sql("SELECT F(a) AS a FROM df") + + assert_eq(return_df, (df[["a"]] ** 2).astype(retty)) # Test row UDFs with one arg @@ -102,12 +86,10 @@ def f(row, k): f, "f", [("a", np.int64), ("k", const_type)], retty, row_udf=True ) - statement = f"SELECT F(a, {k}) as a from df" + return_df = c.sql(f"SELECT F(a, {k}) as a from df") + expected_df = op(df[["a"]], k).astype(retty) - return_df = c.sql(statement) - return_df = return_df.compute() - expectation = op(df[["a"]], k).astype(retty) - assert_frame_equal(return_df.reset_index(drop=True), expectation) + assert_eq(return_df, expected_df) # Test row UDFs with two args @@ -135,13 +117,10 @@ def f(row, k1, k2): row_udf=True, ) - statement = f"SELECT F(a, {k1}, {k2}) as a from df" - - return_df = c.sql(statement) - return_df = return_df.compute() + return_df = c.sql(f"SELECT F(a, {k1}, {k2}) as a from df") + expected_df = op(op(df[["a"]], k1), k2).astype(retty) - expectation = op(op(df[["a"]], k1), k2).astype(retty) - assert_frame_equal(return_df.reset_index(drop=True), expectation) + assert_eq(return_df, expected_df) def test_multiple_definitions(c, df_simple): @@ -157,9 +136,9 @@ def f(x): FROM df_simple """ ) - return_df = return_df.compute() + expected_df = df_simple[["a", "b"]] ** 2 - assert_frame_equal(return_df.reset_index(drop=True), df_simple[["a", "b"]] ** 2) + assert_eq(return_df, expected_df) def f(x): return x**3 @@ -173,9 +152,9 @@ def f(x): FROM df_simple """ ) - return_df = return_df.compute() + expected_df = df_simple[["a", "b"]] ** 3 - assert_frame_equal(return_df.reset_index(drop=True), df_simple[["a", "b"]] ** 3) + assert_eq(return_df, expected_df) def test_aggregate_function(c): @@ -188,9 +167,8 @@ def test_aggregate_function(c): FROM df """ ) - return_df = return_df.compute() - assert (return_df["test"] == return_df["S"]).all() + assert_eq(return_df["test"], return_df["S"], check_names=False) def test_reregistration(c): diff --git a/tests/integration/test_groupby.py b/tests/integration/test_groupby.py index 0fef45679..658ad4fa2 100644 --- a/tests/integration/test_groupby.py +++ b/tests/integration/test_groupby.py @@ -1,12 +1,29 @@ +import dask.dataframe as dd import numpy as np import pandas as pd import pytest -from dask import dataframe as dd -from pandas.testing import assert_frame_equal, assert_series_equal +from dask.datasets import timeseries + +from tests.utils import assert_eq + + +@pytest.fixture() +def timeseries_df(c): + pdf = timeseries(freq="1d").compute().reset_index(drop=True) + + # input nans in pandas dataframe + col1_index = np.random.randint(0, 30, size=int(pdf.shape[0] * 0.2)) + col2_index = np.random.randint(0, 30, size=int(pdf.shape[0] * 0.3)) + pdf.loc[col1_index, "x"] = np.nan + pdf.loc[col2_index, "y"] = np.nan + + c.create_table("timeseries", pdf, persist=True) + + return None def test_group_by(c): - df = c.sql( + return_df = c.sql( """ SELECT user_id, SUM(b) AS "S" @@ -14,10 +31,9 @@ def test_group_by(c): GROUP BY user_id """ ) - df = df.compute() - expected_df = pd.DataFrame({"user_id": [1, 2, 3], "S": [3, 4, 3]}) - assert_frame_equal(df.sort_values("user_id").reset_index(drop=True), expected_df) + + assert_eq(return_df.sort_values("user_id").reset_index(drop=True), expected_df) def test_group_by_all(c, df): @@ -28,12 +44,11 @@ def test_group_by_all(c, df): FROM user_table_1 """ ) - result_df = result_df.compute() - expected_df = pd.DataFrame({"S": [10], "X": [8]}) expected_df["S"] = expected_df["S"].astype("int64") expected_df["X"] = expected_df["X"].astype("int32") - assert_frame_equal(result_df, expected_df) + + assert_eq(result_df, expected_df) result_df = c.sql( """ @@ -48,8 +63,6 @@ def test_group_by_all(c, df): FROM df """ ) - result_df = result_df.compute() - expected_df = pd.DataFrame( { "sum_a": [df.a.sum()], @@ -61,11 +74,12 @@ def test_group_by_all(c, df): "mix_3": [(df.a + df.b).mean()], } ) - assert_frame_equal(result_df, expected_df) + + assert_eq(result_df, expected_df) def test_group_by_filtered(c): - df = c.sql( + return_df = c.sql( """ SELECT SUM(b) FILTER (WHERE user_id = 2) AS "S1", @@ -73,14 +87,11 @@ def test_group_by_filtered(c): FROM user_table_1 """ ) - df = df.compute() - expected_df = pd.DataFrame({"S1": [4], "S2": [10]}, dtype="int64") - assert_frame_equal(df, expected_df) + assert_eq(return_df, expected_df) -def test_group_by_filtered2(c): - df = c.sql( + return_df = c.sql( """ SELECT user_id, @@ -90,8 +101,6 @@ def test_group_by_filtered2(c): GROUP BY user_id """ ) - df = df.compute() - expected_df = pd.DataFrame( { "user_id": [1, 2, 3], @@ -99,23 +108,22 @@ def test_group_by_filtered2(c): "S2": [3, 4, 3], }, ) - assert_frame_equal(df, expected_df) - df = c.sql( + assert_eq(return_df, expected_df) + + return_df = c.sql( """ SELECT SUM(b) FILTER (WHERE user_id = 2) AS "S1" FROM user_table_1 """ ) - df = df.compute() - expected_df = pd.DataFrame({"S1": [4]}) - assert_frame_equal(df, expected_df) + assert_eq(return_df, expected_df) def test_group_by_case(c): - df = c.sql( + return_df = c.sql( """ SELECT user_id + 1 AS "A", SUM(CASE WHEN b = 3 THEN 1 END) AS "S" @@ -123,17 +131,18 @@ def test_group_by_case(c): GROUP BY user_id + 1 """ ) - df = df.compute() - expected_df = pd.DataFrame({"A": [2, 3, 4], "S": [1, 1, 1]}) + # Do not check dtypes, as pandas versions are inconsistent here - assert_frame_equal( - df.sort_values("A").reset_index(drop=True), expected_df, check_dtype=False + assert_eq( + return_df.sort_values("A").reset_index(drop=True), + expected_df, + check_dtype=False, ) def test_group_by_nan(c): - df = c.sql( + return_df = c.sql( """ SELECT c @@ -141,18 +150,17 @@ def test_group_by_nan(c): GROUP BY c """ ) - df = df.compute() - expected_df = pd.DataFrame({"c": [3, float("nan"), 1]}) + # The dtype in pandas 1.0.5 and pandas 1.1.0 are different, so - # we can not check here - assert_frame_equal( - df.sort_values("c").reset_index(drop=True), + # we cannot check here + assert_eq( + return_df.sort_values("c").reset_index(drop=True), expected_df.sort_values("c").reset_index(drop=True), check_dtype=False, ) - df = c.sql( + return_df = c.sql( """ SELECT c @@ -160,18 +168,17 @@ def test_group_by_nan(c): GROUP BY c """ ) - df = df.compute() - expected_df = pd.DataFrame({"c": [3, 1, float("inf")]}) expected_df["c"] = expected_df["c"].astype("float64") - assert_frame_equal( - df.sort_values("c").reset_index(drop=True), + + assert_eq( + return_df.sort_values("c").reset_index(drop=True), expected_df.sort_values("c").reset_index(drop=True), ) def test_aggregations(c): - df = c.sql( + return_df = c.sql( """ SELECT user_id, @@ -185,8 +192,6 @@ def test_aggregations(c): GROUP BY user_id """ ) - df = df.compute() - expected_df = pd.DataFrame( { "user_id": [1, 2, 3], @@ -199,9 +204,10 @@ def test_aggregations(c): } ) expected_df["a"] = expected_df["a"].astype("float64") - assert_frame_equal(df.sort_values("user_id").reset_index(drop=True), expected_df) - df = c.sql( + assert_eq(return_df.sort_values("user_id").reset_index(drop=True), expected_df) + + return_df = c.sql( """ SELECT user_id, @@ -215,7 +221,6 @@ def test_aggregations(c): GROUP BY user_id """ ) - df = df.compute() expected_df = pd.DataFrame( { @@ -228,9 +233,9 @@ def test_aggregations(c): "a": [1.5, 3, 4], } ) - assert_frame_equal(df.sort_values("user_id").reset_index(drop=True), expected_df) + assert_eq(return_df.sort_values("user_id").reset_index(drop=True), expected_df) - df = c.sql( + return_df = c.sql( """ SELECT MAX(a) AS "max", @@ -238,27 +243,25 @@ def test_aggregations(c): FROM string_table """ ) - df = df.compute() - expected_df = pd.DataFrame({"max": ["a normal string"], "min": ["%_%"]}) - assert_frame_equal(df.reset_index(drop=True), expected_df) + assert_eq(return_df.reset_index(drop=True), expected_df) -def test_stats_aggregation(c, timeseries_df): - # # test regr_count - regr_count = ( - c.sql( - """ - SELECT name, count(x) filter (where y is not null) as expected, - regr_count(y, x) as calculated from timeseries group by name +def test_stats_aggregation(c, timeseries_df): + # test regr_count + regr_count = c.sql( + """ + SELECT + name, + COUNT(x) FILTER (WHERE y IS NOT NULL) AS expected, + REGR_COUNT(y, x) AS calculated + FROM timeseries + GROUP BY name """ - ) - .compute() - .fillna(0) - ) + ).fillna(0) - assert_series_equal( + assert_eq( regr_count["expected"], regr_count["calculated"], check_dtype=False, @@ -266,17 +269,19 @@ def test_stats_aggregation(c, timeseries_df): ) # test regr_syy - regr_syy = ( - c.sql( - """ - SELECT name, (regr_count(y, x)*var_pop(y)) as expected, regr_syy(y, x) as calculated - FROM timeseries WHERE x IS NOT NULL AND y IS NOT NULL GROUP BY name + regr_syy = c.sql( + """ + SELECT + name, + (REGR_COUNT(y, x) * VAR_POP(y)) AS expected, + REGR_SYY(y, x) AS calculated + FROM timeseries + WHERE x IS NOT NULL AND y IS NOT NULL + GROUP BY name """ - ) - .compute() - .fillna(0) - ) - assert_series_equal( + ).fillna(0) + + assert_eq( regr_syy["expected"], regr_syy["calculated"], check_dtype=False, @@ -284,17 +289,19 @@ def test_stats_aggregation(c, timeseries_df): ) # test regr_sxx - regr_sxx = ( - c.sql( - """ - SELECT name,(regr_count(y, x)*var_pop(x)) as expected, regr_sxx(y,x) as calculated - FROM timeseries WHERE x IS NOT NULL AND y IS NOT NULL GROUP BY name + regr_sxx = c.sql( + """ + SELECT + name, + (REGR_COUNT(y, x) * VAR_POP(x)) AS expected, + REGR_SXX(y,x) AS calculated + FROM timeseries + WHERE x IS NOT NULL AND y IS NOT NULL + GROUP BY name """ - ) - .compute() - .fillna(0) - ) - assert_series_equal( + ).fillna(0) + + assert_eq( regr_sxx["expected"], regr_sxx["calculated"], check_dtype=False, @@ -302,24 +309,26 @@ def test_stats_aggregation(c, timeseries_df): ) # test covar_pop - covar_pop = ( - c.sql( - """ - with temp_agg as ( - select name,avg(y) filter (where x is not null) as avg_y, - avg(x) filter (where y is not null) as avg_x - from timeseries group by name - ) - select ts.name,sum((y - avg_y) * (x - avg_x)) /regr_count(y, x) as expected, - covar_pop(y,x) as calculated from timeseries as ts - join temp_agg as ta on ts.name =ta.name - group by ts.name + covar_pop = c.sql( + """ + WITH temp_agg AS ( + SELECT + name, + AVG(y) FILTER (WHERE x IS NOT NULL) as avg_y, + AVG(x) FILTER (WHERE x IS NOT NULL) as avg_x + FROM timeseries + GROUP BY name + ) SELECT + ts.name, + SUM((y - avg_y) * (x - avg_x)) / REGR_COUNT(y, x) AS expected, + COVAR_POP(y,x) AS calculated + FROM timeseries AS ts + JOIN temp_agg AS ta ON ts.name = ta.name + GROUP BY ts.name """ - ) - .compute() - .fillna(0) - ) - assert_series_equal( + ).fillna(0) + + assert_eq( covar_pop["expected"], covar_pop["calculated"], check_dtype=False, @@ -327,25 +336,26 @@ def test_stats_aggregation(c, timeseries_df): ) # test covar_samp - covar_samp = ( - c.sql( - """ - with temp_agg as ( - select name,avg(y) filter (where x is not null) as avg_y, - avg(x) filter (where y is not null) as avg_x - from timeseries group by name - ) - - select ts.name,sum((y - avg_y) * (x - avg_x)) /(regr_count(y, x)-1) as expected, - covar_samp(y,x) as calculated from timeseries as ts - join temp_agg as ta on ts.name =ta.name - group by ts.name + covar_samp = c.sql( + """ + WITH temp_agg AS ( + SELECT + name, + AVG(y) FILTER (WHERE x IS NOT NULL) as avg_y, + AVG(x) FILTER (WHERE x IS NOT NULL) as avg_x + FROM timeseries + GROUP BY name + ) SELECT + ts.name, + SUM((y - avg_y) * (x - avg_x)) / (REGR_COUNT(y, x) - 1) as expected, + COVAR_SAMP(y,x) AS calculated + FROM timeseries AS ts + JOIN temp_agg AS ta ON ts.name = ta.name + GROUP BY ts.name """ - ) - .compute() - .fillna(0) - ) - assert_series_equal( + ).fillna(0) + + assert_eq( covar_samp["expected"], covar_samp["calculated"], check_dtype=False, @@ -363,7 +373,8 @@ def test_stats_aggregation(c, timeseries_df): @pytest.mark.parametrize("split_out", [None, 2, 4]) def test_groupby_split_out(c, input_table, split_out, request): user_table = request.getfixturevalue(input_table) - df = c.sql( + + return_df = c.sql( f""" SELECT user_id, SUM(b) AS "S" @@ -373,12 +384,15 @@ def test_groupby_split_out(c, input_table, split_out, request): config_options={"sql.groupby.split_out": split_out}, ) expected_df = ( - user_table.groupby(by="user_id").agg({"b": "sum"}).reset_index(drop=False) + user_table.groupby(by="user_id") + .agg({"b": "sum"}) + .reset_index(drop=False) + .rename(columns={"b": "S"}) + .sort_values("user_id") ) - expected_df = expected_df.rename(columns={"b": "S"}) - expected_df = expected_df.sort_values("user_id") - assert df.npartitions == split_out if split_out else 1 - dd.assert_eq(df.compute().sort_values("user_id"), expected_df, check_index=False) + + assert return_df.npartitions == split_out if split_out else 1 + assert_eq(return_df.sort_values("user_id"), expected_df, check_index=False) @pytest.mark.parametrize( @@ -387,21 +401,20 @@ def test_groupby_split_out(c, input_table, split_out, request): (False, 2, 74), (False, 3, 68), (False, 4, 64), - pytest.param(True, 2, 91, marks=pytest.mark.gpu), - pytest.param(True, 3, 85, marks=pytest.mark.gpu), - pytest.param(True, 4, 81, marks=pytest.mark.gpu), + pytest.param(True, 2, 107, marks=pytest.mark.gpu), + pytest.param(True, 3, 101, marks=pytest.mark.gpu), + pytest.param(True, 4, 97, marks=pytest.mark.gpu), ], ) def test_groupby_split_every(c, gpu, split_every, expected_keys): - xd = pytest.importorskip("cudf") if gpu else pd input_ddf = dd.from_pandas( - xd.DataFrame({"user_id": [1, 2, 3, 4] * 16, "b": [5, 6, 7, 8] * 16}), + pd.DataFrame({"user_id": [1, 2, 3, 4] * 16, "b": [5, 6, 7, 8] * 16}), npartitions=16, ) # Need an input with multiple partitions to demonstrate split_every - c.create_table("split_every_input", input_ddf) + c.create_table("split_every_input", input_ddf, gpu=gpu) - df = c.sql( + return_df = c.sql( """ SELECT user_id, SUM(b) AS "S" @@ -418,7 +431,7 @@ def test_groupby_split_every(c, gpu, split_every, expected_keys): .sort_values("user_id") ) - assert len(df.dask.keys()) == expected_keys - dd.assert_eq(df, expected_df, check_index=False) + assert len(return_df.dask.keys()) == expected_keys + assert_eq(return_df, expected_df, check_index=False) c.drop_table("split_every_input") diff --git a/tests/integration/test_hive.py b/tests/integration/test_hive.py index 330888618..04e411f8d 100644 --- a/tests/integration/test_hive.py +++ b/tests/integration/test_hive.py @@ -5,9 +5,9 @@ import pandas as pd import pytest -from pandas.testing import assert_frame_equal from dask_sql.context import Context +from tests.utils import assert_eq pytestmark = pytest.mark.skipif( sys.platform == "win32", reason="hive testing not supported on Windows" @@ -181,18 +181,18 @@ def test_select(hive_cursor): c = Context() c.create_table("df", hive_cursor) - result_df = c.sql("SELECT * FROM df").compute().reset_index(drop=True) - df = pd.DataFrame({"i": [1, 2], "j": [2, 4]}).astype("int32") + result_df = c.sql("SELECT * FROM df") + expected_df = pd.DataFrame({"i": [1, 2], "j": [2, 4]}).astype("int32") - assert_frame_equal(df, result_df) + assert_eq(result_df, expected_df, check_index=False) def test_select_partitions(hive_cursor): c = Context() c.create_table("df_part", hive_cursor) - result_df = c.sql("SELECT * FROM df_part").compute().reset_index(drop=True) - df = pd.DataFrame({"i": [1, 2], "j": [2, 4]}).astype("int32") - df["j"] = df["j"].astype("int64") + result_df = c.sql("SELECT * FROM df_part") + expected_df = pd.DataFrame({"i": [1, 2], "j": [2, 4]}).astype("int32") + expected_df["j"] = expected_df["j"].astype("int64") - assert_frame_equal(df, result_df) + assert_eq(result_df, expected_df, check_index=False) diff --git a/tests/integration/test_intake.py b/tests/integration/test_intake.py index 4796b44d5..ea7937029 100644 --- a/tests/integration/test_intake.py +++ b/tests/integration/test_intake.py @@ -4,9 +4,9 @@ import pandas as pd import pytest -from pandas.testing import assert_frame_equal from dask_sql.context import Context +from tests.utils import assert_eq # skip the test if intake is not installed intake = pytest.importorskip("intake") @@ -40,10 +40,10 @@ def intake_catalog_location(): def check_read_table(c): - result_df = c.sql("SELECT * FROM df").compute().reset_index(drop=True) - df = pd.DataFrame({"a": [1], "b": [1.5]}) + result_df = c.sql("SELECT * FROM df").reset_index(drop=True) + expected_df = pd.DataFrame({"a": [1], "b": [1.5]}) - assert_frame_equal(df, result_df) + assert_eq(result_df, expected_df) def test_intake_catalog(intake_catalog_location): diff --git a/tests/integration/test_jdbc.py b/tests/integration/test_jdbc.py index f8426ae46..ce216da84 100644 --- a/tests/integration/test_jdbc.py +++ b/tests/integration/test_jdbc.py @@ -1,3 +1,4 @@ +import os from time import sleep import pandas as pd @@ -37,6 +38,10 @@ def app_client(c): yield TestClient(app) + # don't disconnect the client if using an independent cluster + if os.getenv("DASK_SQL_TEST_SCHEDULER", None) is None: + app.client.close() + def test_jdbc_has_schema(app_client, c): create_meta_data(c) diff --git a/tests/integration/test_join.py b/tests/integration/test_join.py index 44cd1e070..97c35e166 100644 --- a/tests/integration/test_join.py +++ b/tests/integration/test_join.py @@ -1,48 +1,52 @@ import dask.dataframe as dd import numpy as np import pandas as pd -from dask.dataframe.utils import assert_eq -from pandas.testing import assert_frame_equal from dask_sql import Context +from tests.utils import assert_eq def test_join(c): - df = c.sql( - "SELECT lhs.user_id, lhs.b, rhs.c FROM user_table_1 AS lhs JOIN user_table_2 AS rhs ON lhs.user_id = rhs.user_id" + return_df = c.sql( + """ + SELECT lhs.user_id, lhs.b, rhs.c + FROM user_table_1 AS lhs + JOIN user_table_2 AS rhs + ON lhs.user_id = rhs.user_id + """ ) - df = df.compute() - expected_df = pd.DataFrame( {"user_id": [1, 1, 2, 2], "b": [3, 3, 1, 3], "c": [1, 2, 3, 3]} ) - assert_frame_equal( - df.sort_values(["user_id", "b", "c"]).reset_index(drop=True), - expected_df, - ) + + assert_eq(return_df, expected_df, check_index=False) def test_join_inner(c): - df = c.sql( - "SELECT lhs.user_id, lhs.b, rhs.c FROM user_table_1 AS lhs INNER JOIN user_table_2 AS rhs ON lhs.user_id = rhs.user_id" + return_df = c.sql( + """ + SELECT lhs.user_id, lhs.b, rhs.c + FROM user_table_1 AS lhs + INNER JOIN user_table_2 AS rhs + ON lhs.user_id = rhs.user_id + """ ) - df = df.compute() - expected_df = pd.DataFrame( {"user_id": [1, 1, 2, 2], "b": [3, 3, 1, 3], "c": [1, 2, 3, 3]} ) - assert_frame_equal( - df.sort_values(["user_id", "b", "c"]).reset_index(drop=True), - expected_df, - ) + + assert_eq(return_df, expected_df, check_index=False) def test_join_outer(c): - df = c.sql( - "SELECT lhs.user_id, lhs.b, rhs.c FROM user_table_1 AS lhs FULL JOIN user_table_2 AS rhs ON lhs.user_id = rhs.user_id" + return_df = c.sql( + """ + SELECT lhs.user_id, lhs.b, rhs.c + FROM user_table_1 AS lhs + FULL JOIN user_table_2 AS rhs + ON lhs.user_id = rhs.user_id + """ ) - df = df.compute() - expected_df = pd.DataFrame( { # That is strange. Unfortunately, it seems dask fills in the @@ -52,17 +56,19 @@ def test_join_outer(c): "c": [1, 2, 3, 3, np.NaN, 4], } ) - assert_frame_equal( - df.sort_values(["user_id", "b", "c"]).reset_index(drop=True), expected_df - ) + + assert_eq(return_df, expected_df, check_index=False) def test_join_left(c): - df = c.sql( - "SELECT lhs.user_id, lhs.b, rhs.c FROM user_table_1 AS lhs LEFT JOIN user_table_2 AS rhs ON lhs.user_id = rhs.user_id" + return_df = c.sql( + """ + SELECT lhs.user_id, lhs.b, rhs.c + FROM user_table_1 AS lhs + LEFT JOIN user_table_2 AS rhs + ON lhs.user_id = rhs.user_id + """ ) - df = df.compute() - expected_df = pd.DataFrame( { # That is strange. Unfortunately, it seems dask fills in the @@ -72,18 +78,19 @@ def test_join_left(c): "c": [1, 2, 3, 3, np.NaN], } ) - assert_frame_equal( - df.sort_values(["user_id", "b", "c"]).reset_index(drop=True), - expected_df, - ) + + assert_eq(return_df, expected_df, check_index=False) def test_join_right(c): - df = c.sql( - "SELECT lhs.user_id, lhs.b, rhs.c FROM user_table_1 AS lhs RIGHT JOIN user_table_2 AS rhs ON lhs.user_id = rhs.user_id" + return_df = c.sql( + """ + SELECT lhs.user_id, lhs.b, rhs.c + FROM user_table_1 AS lhs + RIGHT JOIN user_table_2 AS rhs + ON lhs.user_id = rhs.user_id + """ ) - df = df.compute() - expected_df = pd.DataFrame( { # That is strange. Unfortunately, it seems dask fills in the @@ -93,36 +100,34 @@ def test_join_right(c): "c": [1, 2, 3, 3, 4], } ) - assert_frame_equal( - df.sort_values(["user_id", "b", "c"]).reset_index(drop=True), - expected_df, - ) + + assert_eq(return_df, expected_df, check_index=False) def test_join_complex(c): - df = c.sql( - "SELECT lhs.a, rhs.b FROM df_simple AS lhs JOIN df_simple AS rhs ON lhs.a < rhs.b", + return_df = c.sql( + """ + SELECT lhs.a, rhs.b + FROM df_simple AS lhs + JOIN df_simple AS rhs + ON lhs.a < rhs.b + """ ) - df = df.compute() - - df_expected = pd.DataFrame( + expected_df = pd.DataFrame( {"a": [1, 1, 1, 2, 2, 3], "b": [1.1, 2.2, 3.3, 2.2, 3.3, 3.3]} ) - assert_frame_equal(df.sort_values(["a", "b"]).reset_index(drop=True), df_expected) + assert_eq(return_df, expected_df, check_index=False) - df = c.sql( - """ - SELECT lhs.a, lhs.b, rhs.a, rhs.b - FROM - df_simple AS lhs - JOIN df_simple AS rhs - ON lhs.a < rhs.b AND lhs.b < rhs.a + return_df = c.sql( """ + SELECT lhs.a, lhs.b, rhs.a, rhs.b + FROM df_simple AS lhs + JOIN df_simple AS rhs + ON lhs.a < rhs.b AND lhs.b < rhs.a + """ ) - df = df.compute() - - df_expected = pd.DataFrame( + expected_df = pd.DataFrame( { "a": [1, 1, 2], "b": [1.1, 1.1, 2.2], @@ -131,43 +136,33 @@ def test_join_complex(c): } ) - assert_frame_equal(df.sort_values(["a", "b0"]).reset_index(drop=True), df_expected) - + assert_eq(return_df, expected_df, check_index=False) -def test_join_complex_2(c): - df = c.sql( + return_df = c.sql( """ - SELECT - lhs.user_id, lhs.b, rhs.user_id, rhs.c + SELECT lhs.user_id, lhs.b, rhs.user_id, rhs.c FROM user_table_1 AS lhs JOIN user_table_2 AS rhs - ON rhs.user_id = lhs.user_id AND rhs.c - lhs.b >= 0 + ON rhs.user_id = lhs.user_id AND rhs.c - lhs.b >= 0 """ ) - - df = df.compute() - - df_expected = pd.DataFrame( + expected_df = pd.DataFrame( {"user_id": [2, 2], "b": [1, 3], "user_id0": [2, 2], "c": [3, 3]} ) - assert_frame_equal(df.sort_values("b").reset_index(drop=True), df_expected) + assert_eq(return_df, expected_df, check_index=False) def test_join_literal(c): - df = c.sql( + return_df = c.sql( """ - SELECT - lhs.user_id, lhs.b, rhs.user_id, rhs.c + SELECT lhs.user_id, lhs.b, rhs.user_id, rhs.c FROM user_table_1 AS lhs JOIN user_table_2 AS rhs - ON True + ON True """ ) - - df = df.compute() - - df_expected = pd.DataFrame( + expected_df = pd.DataFrame( { "user_id": [2, 2, 2, 2, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3], "b": [1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3], @@ -176,26 +171,19 @@ def test_join_literal(c): } ) - assert_frame_equal( - df.sort_values(["b", "user_id", "user_id0"]).reset_index(drop=True), - df_expected, - ) + assert_eq(return_df, expected_df, check_index=False) - df = c.sql( + return_df = c.sql( """ - SELECT - lhs.user_id, lhs.b, rhs.user_id, rhs.c + SELECT lhs.user_id, lhs.b, rhs.user_id, rhs.c FROM user_table_1 AS lhs JOIN user_table_2 AS rhs - ON False + ON False """ ) + expected_df = pd.DataFrame({"user_id": [], "b": [], "user_id0": [], "c": []}) - df = df.compute() - - df_expected = pd.DataFrame({"user_id": [], "b": [], "user_id0": [], "c": []}) - - assert_frame_equal(df.reset_index(), df_expected.reset_index(), check_dtype=False) + assert_eq(return_df, expected_df, check_dtype=False, check_index=False) def test_conditional_join(c): @@ -256,7 +244,7 @@ def test_join_case_projection_subquery(): c.create_table("sales", sales, persist=False) c.create_table("t_dim", t_dim, persist=False) - actual_df = c.sql( + c.sql( """ SELECT CASE WHEN pmc > 0.0 THEN CAST (amc AS DOUBLE) / CAST (pmc AS DOUBLE) ELSE -1.0 END AS am_pm_ratio FROM diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 791ac0722..396d80573 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -8,6 +8,7 @@ from dask.datasets import timeseries from tests.integration.fixtures import skip_if_external_scheduler +from tests.utils import assert_eq try: import cuml @@ -231,10 +232,10 @@ def test_show_models(c, training_df): ) """ ) + result = c.sql("SHOW MODELS") expected = pd.DataFrame(["my_model1", "my_model2", "my_model3"], columns=["Models"]) - result: pd.DataFrame = c.sql("SHOW MODELS").compute() - # test - pd.testing.assert_frame_equal(expected, result) + + assert_eq(result, expected) def test_wrong_training_or_prediction(c, training_df): @@ -440,12 +441,9 @@ def test_describe_model(c, training_df): .sort_index() ) # test - result = ( - c.sql("DESCRIBE MODEL ex_describe_model") - .compute()["Params"] - .apply(lambda x: str(x)) - ) - pd.testing.assert_series_equal(expected_series, result) + result = c.sql("DESCRIBE MODEL ex_describe_model")["Params"].apply(lambda x: str(x)) + + assert_eq(expected_series, result) with pytest.raises(RuntimeError): c.sql("DESCRIBE MODEL undefined_model") diff --git a/tests/integration/test_over.py b/tests/integration/test_over.py index 20fa5b72c..f2ec4b9df 100644 --- a/tests/integration/test_over.py +++ b/tests/integration/test_over.py @@ -1,17 +1,10 @@ import pandas as pd -from pandas.testing import assert_frame_equal - -def assert_frame_equal_after_sorting(df, expected_df, columns=None, **kwargs): - columns = columns or ["user_id", "b"] - - df = df.sort_values(columns).reset_index(drop=True) - expected_df = expected_df.sort_values(columns).reset_index(drop=True) - assert_frame_equal(df, expected_df, **kwargs) +from tests.utils import assert_eq def test_over_with_sorting(c, user_table_1): - df = c.sql( + return_df = c.sql( """ SELECT user_id, @@ -20,36 +13,31 @@ def test_over_with_sorting(c, user_table_1): FROM user_table_1 """ ) - df = df.compute() + expected_df = user_table_1.sort_values(["user_id", "b"]) + expected_df["R"] = [1, 2, 3, 4] - expected_df = pd.DataFrame( - {"user_id": user_table_1.user_id, "b": user_table_1.b, "R": [3, 1, 2, 4]} - ) - expected_df["R"] = expected_df["R"].astype("Int64") - assert_frame_equal_after_sorting(df, expected_df, columns=["user_id", "b"]) + assert_eq(return_df, expected_df, check_dtype=False, check_index=False) def test_over_with_partitioning(c, user_table_2): - df = c.sql( + return_df = c.sql( """ SELECT user_id, c, ROW_NUMBER() OVER (PARTITION BY c) AS R FROM user_table_2 + ORDER BY user_id, c """ ) - df = df.compute() + expected_df = user_table_2.sort_values(["user_id", "c"]) + expected_df["R"] = [1, 1, 1, 1] - expected_df = pd.DataFrame( - {"user_id": user_table_2.user_id, "c": user_table_2.c, "R": [1, 1, 1, 1]} - ) - expected_df["R"] = expected_df["R"].astype("Int64") - assert_frame_equal_after_sorting(df, expected_df, columns=["user_id", "c"]) + assert_eq(return_df, expected_df, check_dtype=False, check_index=False) def test_over_with_grouping_and_sort(c, user_table_1): - df = c.sql( + return_df = c.sql( """ SELECT user_id, @@ -58,17 +46,14 @@ def test_over_with_grouping_and_sort(c, user_table_1): FROM user_table_1 """ ) - df = df.compute() + expected_df = user_table_1.sort_values(["user_id", "b"]) + expected_df["R"] = [1, 1, 2, 1] - expected_df = pd.DataFrame( - {"user_id": user_table_1.user_id, "b": user_table_1.b, "R": [2, 1, 1, 1]} - ) - expected_df["R"] = expected_df["R"].astype("Int64") - assert_frame_equal_after_sorting(df, expected_df, columns=["user_id", "b"]) + assert_eq(return_df, expected_df, check_dtype=False, check_index=False) def test_over_with_different(c, user_table_1): - df = c.sql( + return_df = c.sql( """ SELECT user_id, @@ -78,8 +63,6 @@ def test_over_with_different(c, user_table_1): FROM user_table_1 """ ) - df = df.compute() - expected_df = pd.DataFrame( { "user_id": user_table_1.user_id, @@ -88,14 +71,12 @@ def test_over_with_different(c, user_table_1): "R2": [3, 1, 2, 4], } ) - for col in ["R1", "R2"]: - expected_df[col] = expected_df[col].astype("Int64") - assert_frame_equal_after_sorting(df, expected_df, columns=["user_id", "b"]) + assert_eq(return_df, expected_df, check_dtype=False, check_index=False) def test_over_calls(c, user_table_1): - df = c.sql( + return_df = c.sql( """ SELECT user_id, @@ -113,8 +94,6 @@ def test_over_calls(c, user_table_1): FROM user_table_1 """ ) - df = df.compute() - expected_df = pd.DataFrame( { "user_id": user_table_1.user_id, @@ -131,20 +110,15 @@ def test_over_calls(c, user_table_1): "O9": [1, 3, 1, 3], } ) - for col in expected_df.columns: - if col in ["06", "user_id", "b"]: - continue - expected_df[col] = expected_df[col].astype("Int64") - expected_df["O6"] = expected_df["O6"].astype("Float64") - assert_frame_equal_after_sorting(df, expected_df, columns=["user_id", "b"]) + assert_eq(return_df, expected_df, check_dtype=False, check_index=False) def test_over_with_windows(c): - df = pd.DataFrame({"a": range(5)}) - c.create_table("tmp", df) + tmp_df = pd.DataFrame({"a": range(5)}) + c.create_table("tmp", tmp_df) - df = c.sql( + return_df = c.sql( """ SELECT a, @@ -161,11 +135,9 @@ def test_over_with_windows(c): FROM tmp """ ) - df = df.compute() - expected_df = pd.DataFrame( { - "a": df.a, + "a": return_df.a, "O1": [0, 1, 3, 6, 9], "O2": [6, 10, 10, 10, 9], "O3": [10, 10, 10, 10, 9], @@ -178,9 +150,5 @@ def test_over_with_windows(c): "O10": [None, 0, 1, 3, 6], } ) - for col in expected_df.columns: - if col in ["a"]: - continue - expected_df[col] = expected_df[col].astype("Int64") - assert_frame_equal_after_sorting(df, expected_df, columns=["a"]) + assert_eq(return_df, expected_df, check_dtype=False, check_index=False) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index aa6096b9e..8e38c94ac 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -4,7 +4,8 @@ import numpy as np import pandas as pd import pytest -from pandas.testing import assert_frame_equal + +from tests.utils import assert_eq @pytest.mark.xfail( @@ -28,8 +29,6 @@ def test_case(c, df): FROM df """ ) - result_df = result_df.compute() - expected_df = pd.DataFrame(index=df.index) expected_df["S1"] = df.a.apply(lambda a: 1 if a == 3 else pd.NA) expected_df["S2"] = df.a.apply(lambda a: a if a > 0 else 1) @@ -40,8 +39,9 @@ def test_case(c, df): ) expected_df["S6"] = df.a.apply(lambda a: 42 if ((a < 2) or (3 < a < 4)) else 47) expected_df["S7"] = df.a.apply(lambda a: 1 if (1 < a <= 4) else 0) + # Do not check dtypes, as pandas versions are inconsistent here - assert_frame_equal(result_df, expected_df, check_dtype=False) + assert_eq(result_df, expected_df, check_dtype=False) def test_literals(c): @@ -55,7 +55,6 @@ def test_literals(c): INTERVAL '1' DAY AS "IN" """ ) - df = df.compute() expected_df = pd.DataFrame( { @@ -68,7 +67,7 @@ def test_literals(c): "IN": [pd.to_timedelta("1d")], } ) - assert_frame_equal(df, expected_df) + assert_eq(df, expected_df) def test_literal_null(c): @@ -77,49 +76,28 @@ def test_literal_null(c): SELECT NULL AS "N", 1 + NULL AS "I" """ ) - df = df.compute() expected_df = pd.DataFrame({"N": [pd.NA], "I": [pd.NA]}) expected_df["I"] = expected_df["I"].astype("Int32") - assert_frame_equal(df, expected_df) + assert_eq(df, expected_df) -def test_random(c, df): - result_df = c.sql( - """ - SELECT RAND(0) AS "0", RAND_INTEGER(1, 10) AS "1" - """ - ) - result_df = result_df.compute() +def test_random(c): + query = 'SELECT RAND(0) AS "0", RAND_INTEGER(0, 10) AS "1"' - # As the seed is fixed, this should always give the same results - expected_df = pd.DataFrame({"0": [0.26183678695392976], "1": [8]}) - expected_df["1"] = expected_df["1"].astype("Int32") - assert_frame_equal(result_df, expected_df) + result_df = c.sql(query) - result_df = c.sql( - """ - SELECT RAND(42) AS "R" FROM df WHERE RAND(0) < b - """ - ) + # assert that repeated queries give the same result + assert_eq(result_df, c.sql(query)) + + # assert output result_df = result_df.compute() - assert len(result_df) == 659 - assert list(result_df["R"].iloc[:5]) == [ - 0.5276488824980542, - 0.17861463145673728, - 0.33764733440490524, - 0.6590485298464198, - 0.08554137165307785, - ] + assert result_df["0"].dtype == "float64" + assert result_df["1"].dtype == "Int32" - # If we do not fix the seed, we can just test if it works at all - result_df = c.sql( - """ - SELECT RAND() AS "0", RAND_INTEGER(10) AS "1" - """ - ) - result_df = result_df.compute() + assert 0 <= result_df["0"][0] < 1 + assert 0 <= result_df["1"][0] < 10 @pytest.mark.parametrize( @@ -139,10 +117,9 @@ def test_not(c, input_table, request): WHERE NOT a LIKE '%normal%' """ ) - df = df.compute() expected_df = string_table[~string_table.a.str.contains("normal")] - dd.assert_eq(df, expected_df) + assert_eq(df, expected_df) def test_operators(c, df): @@ -163,7 +140,6 @@ def test_operators(c, df): FROM df """ ) - result_df = result_df.compute() expected_df = pd.DataFrame(index=df.index) expected_df["m"] = df["a"] * df["b"] @@ -177,7 +153,7 @@ def test_operators(c, df): expected_df["l"] = df["a"] < df["b"] expected_df["le"] = df["a"] <= df["b"] expected_df["n"] = df["a"] != df["b"] - assert_frame_equal(result_df, expected_df) + assert_eq(result_df, expected_df) @pytest.mark.parametrize( @@ -198,26 +174,22 @@ def test_operators(c, df): ) def test_like(c, input_table, gpu, request): string_table = request.getfixturevalue(input_table) - if gpu: - xd = pytest.importorskip("cudf") - else: - xd = pd df = c.sql( f""" SELECT * FROM {input_table} WHERE a SIMILAR TO '%n[a-z]rmal st_i%' """ - ).compute() + ) - dd.assert_eq(df, string_table.iloc[[0]]) + assert_eq(df, string_table.iloc[[0]]) df = c.sql( f""" SELECT * FROM {input_table} WHERE a LIKE '%n[a-z]rmal st_i%' """ - ).compute() + ) assert len(df) == 0 @@ -226,47 +198,47 @@ def test_like(c, input_table, gpu, request): SELECT * FROM {input_table} WHERE a LIKE 'Ä%Ä_Ä%' ESCAPE 'Ä' """ - ).compute() + ) - dd.assert_eq(df, string_table.iloc[[1]]) + assert_eq(df, string_table.iloc[[1]]) df = c.sql( f""" SELECT * FROM {input_table} WHERE a SIMILAR TO '^|()-*r[r]$' ESCAPE 'r' """ - ).compute() + ) - dd.assert_eq(df, string_table.iloc[[2]]) + assert_eq(df, string_table.iloc[[2]]) df = c.sql( f""" SELECT * FROM {input_table} WHERE a LIKE '^|()-*r[r]$' ESCAPE 'r' """ - ).compute() + ) - dd.assert_eq(df, string_table.iloc[[2]]) + assert_eq(df, string_table.iloc[[2]]) df = c.sql( f""" SELECT * FROM {input_table} WHERE a LIKE '%_' ESCAPE 'r' """ - ).compute() + ) - dd.assert_eq(df, string_table) + assert_eq(df, string_table) - string_table2 = xd.DataFrame({"b": ["a", "b", None, pd.NA, float("nan")]}) - c.register_dask_table(dd.from_pandas(string_table2, npartitions=1), "string_table2") + string_table2 = pd.DataFrame({"b": ["a", "b", None, pd.NA, float("nan")]}) + c.create_table("string_table2", string_table2, gpu=gpu) df = c.sql( """ SELECT * FROM string_table2 WHERE b LIKE 'b' """ - ).compute() + ) - dd.assert_eq(df, string_table2.iloc[[1]]) + assert_eq(df, string_table2.iloc[[1]]) def test_null(c): @@ -277,13 +249,13 @@ def test_null(c): c IS NULL AS n FROM user_table_nan """ - ).compute() + ) expected_df = pd.DataFrame(index=[0, 1, 2]) expected_df["nn"] = [True, False, True] expected_df["nn"] = expected_df["nn"].astype("boolean") expected_df["n"] = [False, True, False] - assert_frame_equal(df, expected_df) + assert_eq(df, expected_df) df = c.sql( """ @@ -292,13 +264,13 @@ def test_null(c): a IS NULL AS n FROM string_table """ - ).compute() + ) expected_df = pd.DataFrame(index=[0, 1, 2]) expected_df["nn"] = [True, True, True] expected_df["nn"] = expected_df["nn"].astype("boolean") expected_df["n"] = [False, False, False] - assert_frame_equal(df, expected_df) + assert_eq(df, expected_df) def test_boolean_operations(c): @@ -318,7 +290,7 @@ def test_boolean_operations(c): b IS UNKNOWN AS u, b IS NOT UNKNOWN AS nu FROM df""" - ).compute() + ) expected_df = pd.DataFrame( { @@ -334,7 +306,7 @@ def test_boolean_operations(c): expected_df["nt"] = expected_df["nt"].astype("boolean") expected_df["nf"] = expected_df["nf"].astype("boolean") expected_df["nu"] = expected_df["nu"].astype("boolean") - assert_frame_equal(df, expected_df) + assert_eq(df, expected_df) def test_math_operations(c, df): @@ -367,7 +339,7 @@ def test_math_operations(c, df): , TRUNCATE(b) AS "truncate" FROM df """ - ).compute() + ) expected_df = pd.DataFrame(index=df.index) expected_df["abs"] = df.b.abs() @@ -394,7 +366,7 @@ def test_math_operations(c, df): expected_df["sin"] = np.sin(df.b) expected_df["tan"] = np.tan(df.b) expected_df["truncate"] = np.trunc(df.b) - assert_frame_equal(result_df, expected_df) + assert_eq(result_df, expected_df) def test_integer_div(c, df_simple): @@ -406,7 +378,7 @@ def test_integer_div(c, df_simple): 1.0 / a AS c FROM df_simple """ - ).compute() + ) expected_df = pd.DataFrame(index=df_simple.index) expected_df["a"] = [1, 0, 0] @@ -414,7 +386,7 @@ def test_integer_div(c, df_simple): expected_df["b"] = [0, 1, 1] expected_df["b"] = expected_df["b"].astype("Int64") expected_df["c"] = [1.0, 0.5, 0.333333] - assert_frame_equal(df, expected_df) + assert_eq(df, expected_df) def test_subqueries(c, user_table_1, user_table_2): @@ -431,13 +403,10 @@ def test_subqueries(c, user_table_1, user_table_2): user_table_1.b = user_table_2.c ) """ - ).compute() - - assert_frame_equal( - df.reset_index(drop=True), - user_table_2[user_table_2.c.isin(user_table_1.b)].reset_index(drop=True), ) + assert_eq(df, user_table_2[user_table_2.c.isin(user_table_1.b)], check_index=False) + @pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) def test_string_functions(c, gpu): @@ -474,10 +443,9 @@ def test_string_functions(c, gpu): FROM {input_table} """ - ).compute() + ) if gpu: - df = df.to_pandas() df = df.astype({"c": "int64", "f": "int64", "g": "int64"}) expected_df = pd.DataFrame( @@ -507,7 +475,7 @@ def test_string_functions(c, gpu): } ) - assert_frame_equal( + assert_eq( df.head(1), expected_df, ) @@ -565,7 +533,7 @@ def test_date_functions(c): FROM df """ - ).compute() + ) expected_df = pd.DataFrame( { @@ -608,7 +576,7 @@ def test_date_functions(c): } ) - assert_frame_equal(df, expected_df, check_dtype=False) + assert_eq(df, expected_df, check_dtype=False) # test exception handling with pytest.raises(NotImplementedError): @@ -618,4 +586,4 @@ def test_date_functions(c): FLOOR(d TO YEAR) as floor_to_year FROM df """ - ).compute() + ) diff --git a/tests/integration/test_sample.py b/tests/integration/test_sample.py index 889aa420b..7e7635139 100644 --- a/tests/integration/test_sample.py +++ b/tests/integration/test_sample.py @@ -1,57 +1,46 @@ def test_sample(c, df): # Fixed sample, check absolute numbers return_df = c.sql("SELECT * FROM df TABLESAMPLE SYSTEM (20) REPEATABLE (10)") - return_df = return_df.compute() assert len(return_df) == 234 return_df = c.sql("SELECT * FROM df TABLESAMPLE SYSTEM (20) REPEATABLE (11)") - return_df = return_df.compute() assert len(return_df) == 468 # Yes, that is horrible, but at least fast... return_df = c.sql("SELECT * FROM df TABLESAMPLE SYSTEM (50) REPEATABLE (10)") - return_df = return_df.compute() assert len(return_df) == 234 return_df = c.sql("SELECT * FROM df TABLESAMPLE SYSTEM (0.001) REPEATABLE (10)") - return_df = return_df.compute() assert len(return_df) == 0 return_df = c.sql("SELECT * FROM df TABLESAMPLE SYSTEM (99.999) REPEATABLE (10)") - return_df = return_df.compute() assert len(return_df) == len(df) return_df = c.sql("SELECT * FROM df TABLESAMPLE BERNOULLI (50) REPEATABLE (10)") - return_df = return_df.compute() assert len(return_df) == 350 return_df = c.sql("SELECT * FROM df TABLESAMPLE BERNOULLI (70) REPEATABLE (10)") - return_df = return_df.compute() assert len(return_df) == 490 return_df = c.sql("SELECT * FROM df TABLESAMPLE BERNOULLI (0.001) REPEATABLE (10)") - return_df = return_df.compute() assert len(return_df) == 0 return_df = c.sql("SELECT * FROM df TABLESAMPLE BERNOULLI (99.999) REPEATABLE (10)") - return_df = return_df.compute() assert len(return_df) == len(df) # Not fixed sample, can only check boundaries return_df = c.sql("SELECT * FROM df TABLESAMPLE BERNOULLI (50)") - return_df = return_df.compute() assert len(return_df) >= 0 and len(return_df) <= len(df) return_df = c.sql("SELECT * FROM df TABLESAMPLE SYSTEM (50)") - return_df = return_df.compute() assert len(return_df) >= 0 and len(return_df) <= len(df) diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 50cc90c3d..33b0ddc4d 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -1,30 +1,30 @@ import dask.dataframe as dd import numpy as np import pytest -from pandas.testing import assert_frame_equal from dask_sql.utils import ParsingException +from tests.utils import assert_eq def test_table_schema(c, df): - original_df = c.sql("SELECT * FROM df").compute() + original_df = c.sql("SELECT * FROM df") - assert_frame_equal(original_df, c.sql("SELECT * FROM root.df").compute()) + assert_eq(original_df, c.sql("SELECT * FROM root.df")) c.sql("CREATE SCHEMA foo") - assert_frame_equal(original_df, c.sql("SELECT * FROM df").compute()) + assert_eq(original_df, c.sql("SELECT * FROM df")) c.sql('USE SCHEMA "foo"') - assert_frame_equal(original_df, c.sql("SELECT * FROM root.df").compute()) + assert_eq(original_df, c.sql("SELECT * FROM root.df")) c.sql("CREATE TABLE bar AS TABLE root.df") - assert_frame_equal(original_df, c.sql("SELECT * FROM bar").compute()) + assert_eq(original_df, c.sql("SELECT * FROM bar")) with pytest.raises(KeyError): c.sql("CREATE TABLE other.bar AS TABLE df") c.sql('USE SCHEMA "root"') - assert_frame_equal(original_df, c.sql("SELECT * FROM foo.bar").compute()) + assert_eq(original_df, c.sql("SELECT * FROM foo.bar")) with pytest.raises(ParsingException): c.sql("SELECT * FROM bar") diff --git a/tests/integration/test_select.py b/tests/integration/test_select.py index c6ce6a5a7..6f93692f9 100644 --- a/tests/integration/test_select.py +++ b/tests/integration/test_select.py @@ -1,35 +1,31 @@ -import dask.dataframe as dd import numpy as np import pandas as pd import pytest -from pandas.testing import assert_frame_equal from dask_sql.utils import ParsingException +from tests.utils import assert_eq def test_select(c, df): result_df = c.sql("SELECT * FROM df") - result_df = result_df.compute() - assert_frame_equal(result_df, df) + assert_eq(result_df, df) def test_select_alias(c, df): result_df = c.sql("SELECT a as b, b as a FROM df") - result_df = result_df.compute() expected_df = pd.DataFrame(index=df.index) expected_df["b"] = df.a expected_df["a"] = df.b - assert_frame_equal(result_df[["a", "b"]], expected_df[["a", "b"]]) + assert_eq(result_df[["a", "b"]], expected_df[["a", "b"]]) def test_select_column(c, df): result_df = c.sql("SELECT a FROM df") - result_df = result_df.compute() - assert_frame_equal(result_df, df[["a"]]) + assert_eq(result_df, df[["a"]]) def test_select_different_types(c): @@ -42,20 +38,19 @@ def test_select_different_types(c): } ) c.create_table("df", expected_df) - df = c.sql( + result_df = c.sql( """ SELECT * FROM df """ ) - df = df.compute() - assert_frame_equal(df, expected_df) + assert_eq(result_df, expected_df) def test_select_expr(c, df): result_df = c.sql("SELECT a + 1 AS a, b AS bla, a - 1 FROM df") - result_df = result_df.compute() + result_df = result_df expected_df = pd.DataFrame( { @@ -64,7 +59,7 @@ def test_select_expr(c, df): '"df"."a" - 1': df["a"] - 1, } ) - assert_frame_equal(result_df, expected_df) + assert_eq(result_df, expected_df) def test_select_of_select(c, df): @@ -78,10 +73,9 @@ def test_select_of_select(c, df): ) AS "inner" """ ) - result_df = result_df.compute() expected_df = pd.DataFrame({"e": 2 * (df["a"] - 1), "f": 2 * df["b"] - 1}) - assert_frame_equal(result_df, expected_df) + assert_eq(result_df, expected_df) def test_select_of_select_with_casing(c, df): @@ -95,13 +89,12 @@ def test_select_of_select_with_casing(c, df): ) AS "inner" """ ) - result_df = result_df.compute() expected_df = pd.DataFrame( {"AAA": df["a"] + df["b"], "aaa": 2 * df["b"], "aAa": df["a"] - 1} ) - assert_frame_equal(result_df, expected_df) + assert_eq(result_df, expected_df) def test_wrong_input(c): @@ -118,9 +111,30 @@ def test_timezones(c, datetime_table): SELECT * FROM datetime_table """ ) - result_df = result_df.compute() - assert_frame_equal(result_df, datetime_table) + assert_eq(result_df, datetime_table) + + +@pytest.mark.parametrize( + "input_table", + [ + "long_table", + pytest.param("gpu_long_table", marks=pytest.mark.gpu), + ], +) +@pytest.mark.parametrize( + "limit,offset", + [(101, 0), (200, 0), (100, 0), (100, 99), (100, 100), (101, 101), (0, 101)], +) +def test_limit(c, input_table, limit, offset, request): + long_table = request.getfixturevalue(input_table) + + if not limit: + query = f"SELECT * FROM long_table OFFSET {offset}" + else: + query = f"SELECT * FROM long_table LIMIT {limit} OFFSET {offset}" + + assert_eq(c.sql(query), long_table.iloc[offset : offset + limit if limit else None]) @pytest.mark.parametrize( @@ -153,7 +167,7 @@ def test_date_casting(c, input_table, request): expected_df["utc_timezone"].astype(" 2, df1.a, df2.a).compute(), pd.Series([3, 2, 3]), check_names=False - ) + assert_eq(op(df1.a > 2, df1.a, df2.a), pd.Series([3, 2, 3]), check_names=False) - assert_series_equal( - op(df1.a > 2, 99, df2.a).compute(), pd.Series([3, 2, 99]), check_names=False - ) + assert_eq(op(df1.a > 2, 99, df2.a), pd.Series([3, 2, 99]), check_names=False) - assert_series_equal( - op(df1.a > 2, 99, -1).compute(), pd.Series([-1, -1, 99]), check_names=False - ) + assert_eq(op(df1.a > 2, 99, -1), pd.Series([-1, -1, 99]), check_names=False) - assert_series_equal( - op(df1.a > 2, df1.a, -1).compute(), pd.Series([-1, -1, 3]), check_names=False - ) + assert_eq(op(df1.a > 2, df1.a, -1), pd.Series([-1, -1, 3]), check_names=False) assert op(True, 1, 2) == 1 assert op(False, 1, 2) == 2 @@ -59,11 +51,9 @@ def test_case(): def test_is_true(): op = call.IsTrueOperation() - assert_series_equal( - op(df1.a > 2).compute(), pd.Series([False, False, True]), check_names=False - ) - assert_series_equal( - op(df3.a).compute(), + assert_eq(op(df1.a > 2), pd.Series([False, False, True]), check_names=False) + assert_eq( + op(df3.a), pd.Series([True, False, False], dtype="boolean"), check_names=False, ) @@ -78,11 +68,9 @@ def test_is_true(): def test_is_false(): op = call.IsFalseOperation() - assert_series_equal( - op(df1.a > 2).compute(), pd.Series([True, True, False]), check_names=False - ) - assert_series_equal( - op(df3.a).compute(), + assert_eq(op(df1.a > 2), pd.Series([True, True, False]), check_names=False) + assert_eq( + op(df3.a), pd.Series([False, False, True], dtype="boolean"), check_names=False, ) @@ -121,53 +109,51 @@ def test_nan(): assert op(None) assert op(np.NaN) assert op(pd.NA) - assert_series_equal( - op(pd.Series(["a", None, "c"])), pd.Series([False, True, False]) - ) - assert_series_equal( + assert_eq(op(pd.Series(["a", None, "c"])), pd.Series([False, True, False])) + assert_eq( op(pd.Series([3, 2, np.NaN, pd.NA])), pd.Series([False, False, True, True]) ) def test_simple_ops(): - assert_series_equal( - ops_mapping["and"](df1.a >= 2, df2.a >= 2).compute(), + assert_eq( + ops_mapping["and"](df1.a >= 2, df2.a >= 2), pd.Series([False, True, False]), check_names=False, ) - assert_series_equal( - ops_mapping["or"](df1.a >= 2, df2.a >= 2).compute(), + assert_eq( + ops_mapping["or"](df1.a >= 2, df2.a >= 2), pd.Series([True, True, True]), check_names=False, ) - assert_series_equal( - ops_mapping[">="](df1.a, df2.a).compute(), + assert_eq( + ops_mapping[">="](df1.a, df2.a), pd.Series([False, True, True]), check_names=False, ) - assert_series_equal( - ops_mapping["+"](df1.a, df2.a, df1.a).compute(), + assert_eq( + ops_mapping["+"](df1.a, df2.a, df1.a), pd.Series([5, 6, 7]), check_names=False, ) def test_math_operations(): - assert_series_equal( - ops_mapping["abs"](-df1.a).compute(), + assert_eq( + ops_mapping["abs"](-df1.a), pd.Series([1, 2, 3]), check_names=False, ) - assert_series_equal( - ops_mapping["round"](df1.a).compute(), + assert_eq( + ops_mapping["round"](df1.a), pd.Series([1, 2, 3]), check_names=False, ) - assert_series_equal( - ops_mapping["floor"](df1.a).compute(), + assert_eq( + ops_mapping["floor"](df1.a), pd.Series([1.0, 2.0, 3.0]), check_names=False, ) diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index 697c0aee5..b8cfa6504 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -6,6 +6,7 @@ from dask_sql import Context from dask_sql.datacontainer import Statistics +from tests.utils import assert_eq try: import cudf @@ -66,7 +67,7 @@ def test_explain(gpu): "DaskTableScan(table=[[root, df]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = " ) - c.create_table("df", data_frame, statistics=Statistics(row_count=1337)) + c.create_table("df", data_frame, statistics=Statistics(row_count=1337), gpu=gpu) sql_string = c.explain("SELECT * FROM df") @@ -104,18 +105,18 @@ def test_sql(gpu): c.create_table("df", data_frame, gpu=gpu) result = c.sql("SELECT * FROM df") - assert isinstance(result, dd.DataFrame if not gpu else dask_cudf.DataFrame) - dd.assert_eq(result, data_frame) + assert isinstance(result, dd.DataFrame) + assert_eq(result, data_frame) result = c.sql("SELECT * FROM df", return_futures=False) - assert isinstance(result, pd.DataFrame if not gpu else cudf.DataFrame) - dd.assert_eq(result, data_frame) + assert not isinstance(result, dd.DataFrame) + assert_eq(result, data_frame) result = c.sql( "SELECT * FROM other_df", dataframes={"other_df": data_frame}, gpu=gpu ) - assert isinstance(result, dd.DataFrame if not gpu else dask_cudf.DataFrame) - dd.assert_eq(result, data_frame) + assert isinstance(result, dd.DataFrame) + assert_eq(result, data_frame) @pytest.mark.parametrize( @@ -135,7 +136,7 @@ def test_input_types(temporary_data_file, gpu): def assert_correct_output(gpu): result = c.sql("SELECT * FROM df") assert isinstance(result, dd.DataFrame if not gpu else dask_cudf.DataFrame) - dd.assert_eq(result, df) + assert_eq(result, df) c.create_table("df", df, gpu=gpu) assert_correct_output(gpu=gpu) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 000000000..680365d19 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,13 @@ +import os + +from dask.dataframe.utils import assert_eq as _assert_eq + +# use independent cluster for testing if it's available +address = os.getenv("DASK_SQL_TEST_SCHEDULER", None) +scheduler = "sync" if address is None else "distributed" + + +def assert_eq(*args, **kwargs): + kwargs.setdefault("scheduler", scheduler) + + return _assert_eq(*args, **kwargs)