Skip to content

Commit

Permalink
Switch tests from pd.testing.assert_frame_equal to dd.assert_eq (#…
Browse files Browse the repository at this point in the history
…365)

* Start moving tests to dd.assert_eq

* Use assert_eq in datetime filter test

* Resolve most resulting test failures

* Resolve remaining test failures

* Convert over tests

* Convert more tests

* Consolidate select limit cpu/gpu test

* Remove remaining assert_series_equal

* Remove explicit cudf imports from many tests

* Resolve rex test failures

* Remove some additional compute calls

* Consolidate sorting tests with getfixturevalue

* Fix failed join test

* Remove breakpoint

* Use custom assert_eq function for tests

* Resolve test failures / seg faults

* Remove unnecessary testing utils

* Resolve local test failures

* Generalize RAND test

* Avoid closing client if using independent cluster

* Fix failures on Windows

* Resolve black failures

* Make random test variables more clear
  • Loading branch information
charlesbluca authored Apr 11, 2022
1 parent 1eb30c1 commit 2bd1d18
Show file tree
Hide file tree
Showing 27 changed files with 651 additions and 1,028 deletions.
18 changes: 3 additions & 15 deletions tests/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import numpy as np
import pandas as pd
import pytest
from dask.datasets import timeseries
from dask.distributed import Client
from pandas.testing import assert_frame_equal

from tests.utils import assert_eq

try:
import cudf
Expand All @@ -23,18 +23,6 @@
SCHEDULER_ADDR = os.getenv("DASK_SQL_TEST_SCHEDULER", None)


@pytest.fixture()
def timeseries_df(c):
pdf = timeseries(freq="1d").compute().reset_index(drop=True)
# impute nans in pandas dataframe
col1_index = np.random.randint(0, 30, size=int(pdf.shape[0] * 0.2))
col2_index = np.random.randint(0, 30, size=int(pdf.shape[0] * 0.3))
pdf.loc[col1_index, "x"] = np.nan
pdf.loc[col2_index, "y"] = np.nan
c.create_table("timeseries", pdf, persist=True)
return pdf


@pytest.fixture()
def df_simple():
return pd.DataFrame({"a": [1, 2, 3], "b": [1.1, 2.2, 3.3]})
Expand Down Expand Up @@ -311,7 +299,7 @@ def _assert_query_gives_same_result(query, sort_columns=None, **kwargs):
sql_result = sql_result.reset_index(drop=True)
dask_result = dask_result.reset_index(drop=True)

assert_frame_equal(sql_result, dask_result, check_dtype=False, **kwargs)
assert_eq(sql_result, dask_result, check_dtype=False, **kwargs)

return _assert_query_gives_same_result

Expand Down
16 changes: 5 additions & 11 deletions tests/integration/test_analyze.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal

from tests.utils import assert_eq


def test_analyze(c, df):
result_df = c.sql("ANALYZE TABLE df COMPUTE STATISTICS FOR ALL COLUMNS")
result_df = result_df.compute()

expected_df = pd.DataFrame(
{
Expand All @@ -15,8 +14,7 @@ def test_analyze(c, df):
df.a.std(),
1.0,
2.0,
# That is actually wrong. But the approximate quantile function in dask gives a different result than the actual computation
result_df["a"].iloc[5],
2.0, # incorrect, but what Dask gives for approx quantile
3.0,
3.0,
"double",
Expand Down Expand Up @@ -50,12 +48,8 @@ def test_analyze(c, df):
)

# The percentiles are calculated only approximately, therefore we do not use exact matching
p = ["25%", "50%", "75%"]
result_df.loc[p, :] = result_df.loc[p, :].astype(float).apply(np.ceil)
expected_df.loc[p, :] = expected_df.loc[p, :].astype(float).apply(np.ceil)
assert_frame_equal(result_df, expected_df, check_exact=False)
assert_eq(result_df, expected_df, rtol=0.135)

result_df = c.sql("ANALYZE TABLE df COMPUTE STATISTICS FOR COLUMNS a")
result_df = result_df.compute()

assert_frame_equal(result_df, expected_df[["a"]])
assert_eq(result_df, expected_df[["a"]], rtol=0.135)
8 changes: 4 additions & 4 deletions tests/integration/test_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal

from dask_sql import Context
from dask_sql.utils import ParsingException
from tests.utils import assert_eq


def cast_datetime_to_string(df):
cols = df.select_dtypes(include=["datetime64[ns]"]).columns
cols = df.select_dtypes(include=["datetime64[ns]"]).columns.tolist()
# Casting to object first as
# directly converting to string looses second precision
df[cols] = df[cols].astype("object").astype("string")
Expand All @@ -36,7 +36,7 @@ def eq_sqlite(sql, **dfs):
c.create_table(name, df)
df.to_sql(name, engine, index=False)

dask_result = c.sql(sql).compute().reset_index(drop=True)
dask_result = c.sql(sql).reset_index(drop=True)
sqlite_result = pd.read_sql(sql, engine).reset_index(drop=True)

# casting to object to ensure equality with sql-lite
Expand All @@ -47,7 +47,7 @@ def eq_sqlite(sql, **dfs):
dask_result = dask_result.fillna(np.NaN)
sqlite_result = sqlite_result.fillna(np.NaN)

assert_frame_equal(dask_result, sqlite_result, check_dtype=False)
assert_eq(dask_result, sqlite_result, check_dtype=False)


def make_rand_df(size: int, **kwargs):
Expand Down
7 changes: 2 additions & 5 deletions tests/integration/test_complex.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ def test_complex_query(c):
lhs.name = rhs.max_name AND
lhs.x = rhs.max_x
"""
)
).compute()

# should not fail
df = result.compute()

assert len(df) > 0
assert len(result) > 0
83 changes: 31 additions & 52 deletions tests/integration/test_create.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import dask.dataframe as dd
import pandas as pd
import pytest
from pandas.testing import assert_frame_equal

import dask_sql
from tests.utils import assert_eq


@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
Expand All @@ -26,12 +26,9 @@ def test_create_from_csv(c, df, temporary_data_file, gpu):
"""
SELECT * FROM new_table
"""
).compute()

if gpu:
result_df = result_df.to_pandas()
)

assert_frame_equal(result_df, df)
assert_eq(result_df, df)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -60,12 +57,9 @@ def test_cluster_memory(client, c, df, gpu):
"""
SELECT * FROM new_table
"""
).compute()

if gpu:
return_df = return_df.to_pandas()
)

assert_frame_equal(df, return_df)
assert_eq(df, return_df)

client.unpublish_dataset("df")

Expand All @@ -91,12 +85,9 @@ def test_create_from_csv_persist(c, df, temporary_data_file, gpu):
"""
SELECT * FROM new_table
"""
).compute()

if gpu:
return_df = return_df.to_pandas()
)

assert_frame_equal(df, return_df)
assert_eq(df, return_df)


def test_wrong_create(c):
Expand Down Expand Up @@ -139,9 +130,9 @@ def test_create_from_query(c, df):
"""
SELECT * FROM new_table
"""
).compute()
)

assert_frame_equal(df, return_df)
assert_eq(df, return_df)

c.sql(
"""
Expand All @@ -157,9 +148,9 @@ def test_create_from_query(c, df):
"""
SELECT * FROM new_table
"""
).compute()
)

assert_frame_equal(df, return_df)
assert_eq(df, return_df)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -210,27 +201,19 @@ def test_view_table_persist(c, temporary_data_file, df, gpu):
"""
)

from_view = c.sql("SELECT c FROM count_view").compute()
from_table = c.sql("SELECT c FROM count_table").compute()

if gpu:
from_view = from_view.to_pandas()
from_table = from_table.to_pandas()
from_view = c.sql("SELECT c FROM count_view")
from_table = c.sql("SELECT c FROM count_table")

assert_frame_equal(from_view, pd.DataFrame({"c": [700]}))
assert_frame_equal(from_table, pd.DataFrame({"c": [700]}))
assert_eq(from_view, pd.DataFrame({"c": [700]}))
assert_eq(from_table, pd.DataFrame({"c": [700]}))

df.iloc[:10].to_csv(temporary_data_file, index=False)

from_view = c.sql("SELECT c FROM count_view").compute()
from_table = c.sql("SELECT c FROM count_table").compute()
from_view = c.sql("SELECT c FROM count_view")
from_table = c.sql("SELECT c FROM count_table")

if gpu:
from_view = from_view.to_pandas()
from_table = from_table.to_pandas()

assert_frame_equal(from_view, pd.DataFrame({"c": [10]}))
assert_frame_equal(from_table, pd.DataFrame({"c": [700]}))
assert_eq(from_view, pd.DataFrame({"c": [10]}))
assert_eq(from_table, pd.DataFrame({"c": [700]}))


def test_replace_and_error(c, temporary_data_file, df):
Expand All @@ -244,8 +227,8 @@ def test_replace_and_error(c, temporary_data_file, df):
"""
)

assert_frame_equal(
c.sql("SELECT a FROM new_table").compute(),
assert_eq(
c.sql("SELECT a FROM new_table"),
pd.DataFrame({"a": [1]}),
check_dtype=False,
)
Expand All @@ -271,8 +254,8 @@ def test_replace_and_error(c, temporary_data_file, df):
"""
)

assert_frame_equal(
c.sql("SELECT a FROM new_table").compute(),
assert_eq(
c.sql("SELECT a FROM new_table"),
pd.DataFrame({"a": [1]}),
check_dtype=False,
)
Expand All @@ -287,8 +270,8 @@ def test_replace_and_error(c, temporary_data_file, df):
"""
)

assert_frame_equal(
c.sql("SELECT a FROM new_table").compute(),
assert_eq(
c.sql("SELECT a FROM new_table"),
pd.DataFrame({"a": [2]}),
check_dtype=False,
)
Expand All @@ -308,8 +291,8 @@ def test_replace_and_error(c, temporary_data_file, df):
"""
)

assert_frame_equal(
c.sql("SELECT a FROM new_table").compute(),
assert_eq(
c.sql("SELECT a FROM new_table"),
pd.DataFrame({"a": [3]}),
check_dtype=False,
)
Expand Down Expand Up @@ -338,8 +321,8 @@ def test_replace_and_error(c, temporary_data_file, df):
"""
)

assert_frame_equal(
c.sql("SELECT a FROM new_table").compute(),
assert_eq(
c.sql("SELECT a FROM new_table"),
pd.DataFrame({"a": [3]}),
check_dtype=False,
)
Expand All @@ -355,13 +338,9 @@ def test_replace_and_error(c, temporary_data_file, df):
"""
)

result_df = c.sql(
"""
SELECT * FROM new_table
"""
).compute()
result_df = c.sql("SELECT * FROM new_table")

assert_frame_equal(result_df, df)
assert_eq(result_df, df)


def test_drop(c):
Expand Down
15 changes: 2 additions & 13 deletions tests/integration/test_distributeby.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,13 @@
import pandas as pd
import pytest

try:
import cudf
except ImportError:
cudf = None


@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
def test_distribute_by(c, gpu):

if gpu:
xd = cudf
else:
xd = pd

df = xd.DataFrame({"id": [0, 1, 2, 1, 2, 3], "val": [0, 1, 2, 1, 2, 3]})
df = pd.DataFrame({"id": [0, 1, 2, 1, 2, 3], "val": [0, 1, 2, 1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=2)

c.create_table("test", ddf)
c.create_table("test", ddf, gpu=gpu)
partitioned_ddf = c.sql(
"""
SELECT
Expand Down
Loading

0 comments on commit 2bd1d18

Please sign in to comment.