Skip to content

Commit

Permalink
Add compatibility for pandas 2 (#1184)
Browse files Browse the repository at this point in the history
* First pass at unblocking pytest errors

* Fix some datetime relateed failures

* Copy datetime table fixture to avoid dropping timezones

* Unpin pandas dependency

* Resolve append deprecations in analyze table

* Resolve append deprecations in JDBC

* Fix deprecated drop call in test_join_cross

* specify mixed datetime format for test_select_different_types

* Week number now accessible through isocalendar

* xfail test_filter_cast_timestamp

* Temp fix for cuDF timezone delocalization

* Add some compat code for pandas<2

* Use timezone-aware datetimes on GPU

* Don't hardcode timezone fixture

* Link to timezone scalar issue
  • Loading branch information
charlesbluca authored Jun 30, 2023
1 parent 2a0fec3 commit 371315c
Show file tree
Hide file tree
Showing 22 changed files with 88 additions and 66 deletions.
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.10-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies:
- mlflow
- mock
- numpy>=1.21.6
- pandas>=1.4.0,<2.0.0
- pandas>=1.4.0
- pre-commit
- prompt_toolkit>=3.0.8
- psycopg2
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.9-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies:
- mlflow
- mock
- numpy>=1.21.6
- pandas>=1.4.0,<2.0.0
- pandas>=1.4.0
- pre-commit
- prompt_toolkit>=3.0.8
- psycopg2
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies:
- mlflow
- mock
- numpy>=1.21.6
- pandas>=1.4.0,<2.0.0
- pandas>=1.4.0
- pre-commit
- prompt_toolkit>=3.0.8
- psycopg2
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies:
- mlflow
- mock
- numpy>=1.21.6
- pandas>=1.4.0,<2.0.0
- pandas>=1.4.0
- pre-commit
- prompt_toolkit>=3.0.8
- psycopg2
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ requirements:
run:
- python
- dask >=2022.3.0
- pandas >=1.4.0,<2.0.0
- pandas >=1.4.0
# FIXME: handling is needed for httpx-based fastapi>=0.87.0
- fastapi >=0.69.0,<0.87.0
- uvicorn >=0.13.4
Expand Down
1 change: 1 addition & 0 deletions dask_sql/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
_dask_version = parseVersion(dask.__version__)

INDEXER_WINDOW_STEP_IMPLEMENTED = _pandas_version >= parseVersion("1.5.0")
PANDAS_GT_200 = _pandas_version >= parseVersion("2.0.0")

# TODO: remove if prompt-toolkit min version gets bumped
PIPE_INPUT_CONTEXT_MANAGER = _prompt_toolkit_version >= parseVersion("3.0.29")
Expand Down
14 changes: 12 additions & 2 deletions dask_sql/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,24 +331,34 @@ def cast_column_type(

def cast_column_to_type(col: dd.Series, expected_type: str):
"""Cast the given column to the expected type"""
pdt = pd.api.types

is_dt_ns = pdt.is_datetime64_ns_dtype
is_dt_tz = lambda t: is_dt_ns(t) and pdt.is_datetime64tz_dtype(t)
is_dt_ntz = lambda t: is_dt_ns(t) and not pdt.is_datetime64tz_dtype(t)

current_type = col.dtype

if similar_type(current_type, expected_type):
logger.debug("...not converting.")
return None

if pd.api.types.is_integer_dtype(expected_type):
if pdt.is_integer_dtype(expected_type):
if pd.api.types.is_float_dtype(current_type):
logger.debug("...truncating...")
# Currently "trunc" can not be applied to NA (the pandas missing value type),
# because NA is a different type. It works with np.NaN though.
# For our use case, that does not matter, as the conversion to integer later
# will convert both NA and np.NaN to NA.
col = da.trunc(col.fillna(value=np.NaN))
elif pd.api.types.is_timedelta64_dtype(current_type):
elif pdt.is_timedelta64_dtype(current_type):
logger.debug(f"Explicitly casting from {current_type} to np.int64")
return col.astype(np.int64)

if is_dt_tz(current_type) and is_dt_ntz(expected_type):
# casting from timezone-aware to timezone-naive datatypes with astype is deprecated in pandas 2
return col.dt.tz_localize(None)

logger.debug(f"Need to cast from {current_type} to {expected_type}")
return col.astype(expected_type)

Expand Down
36 changes: 16 additions & 20 deletions dask_sql/physical/rel/custom/analyze_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,22 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
df = dc.df

# Calculate statistics
statistics = dd.from_pandas(
pd.DataFrame({col: [] for col in columns}), npartitions=1
)
statistics = statistics.append(df[[mapping(col) for col in columns]].describe())

# Add additional information
statistics = statistics.append(
pd.Series(
{
col: str(python_to_sql_type(df[mapping(col)].dtype)).lower()
for col in columns
},
name="data_type",
)
)
statistics = statistics.append(
pd.Series(
{col: col for col in columns},
name="col_name",
)
statistics = dd.concat(
[
df[[mapping(col) for col in columns]].describe(),
pd.DataFrame(
{
mapping(col): str(
python_to_sql_type(df[mapping(col)].dtype)
).lower()
for col in columns
},
index=["data_type"],
),
pd.DataFrame(
{mapping(col): col for col in columns}, index=["col_name"]
),
]
)

cc = ColumnContainer(statistics.columns)
Expand Down
4 changes: 2 additions & 2 deletions dask_sql/physical/rex/core/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dask.utils import random_state_data

from dask_planner.rust import SqlTypeName
from dask_sql._compat import DASK_CUDF_TODATETIME_SUPPORT
from dask_sql._compat import DASK_CUDF_TODATETIME_SUPPORT, PANDAS_GT_200
from dask_sql.datacontainer import DataContainer
from dask_sql.mappings import (
cast_column_to_type,
Expand Down Expand Up @@ -927,7 +927,7 @@ def date_part(self, what, df: SeriesOrScalar):
elif what in {"SECOND", "SECONDS"}:
return df.second
elif what in {"WEEK", "WEEKS"}:
return df.week
return df.isocalendar().week if PANDAS_GT_200 else df.week
elif what in {"YEAR", "YEARS"}:
return df.year
elif what == "DATE":
Expand Down
6 changes: 3 additions & 3 deletions dask_sql/server/presto_jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ def create_meta_data(c: Context):
# catalogs = pd.DataFrame().append(create_catalog_row(catalog), ignore_index=True)
# c.create_table("catalogs", catalogs, schema_name=system_schema)

schemas = pd.DataFrame().append(create_schema_row(), ignore_index=True)
schemas = pd.DataFrame(create_schema_row(), index=[0])
c.create_table("schemas", schemas, schema_name=system_schema)
schema_rows = []

tables = pd.DataFrame().append(create_table_row(), ignore_index=True)
tables = pd.DataFrame(create_table_row(), index=[0])
c.create_table("tables", tables, schema_name=system_schema)
table_rows = []

columns = pd.DataFrame().append(create_column_row(), ignore_index=True)
columns = pd.DataFrame(create_column_row(), index=[0])
c.create_table("columns", columns, schema_name=system_schema)
column_rows = []

Expand Down
2 changes: 1 addition & 1 deletion docker/conda.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
python>=3.8
dask>=2022.3.0
pandas>=1.4.0,<2.0.0
pandas>=1.4.0
jpype1>=1.0.2
openjdk>=8
maven>=3.6.0
Expand Down
2 changes: 1 addition & 1 deletion docker/main.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN mamba install -y \
"setuptools-rust>=1.5.2" \
# core dependencies
"dask>=2022.3.0" \
"pandas>=1.4.0,<2.0.0" \
"pandas>=1.4.0" \
# FIXME: handling is needed for httpx-based fastapi>=0.87.0
"fastapi>=0.69.0,<0.87.0" \
"uvicorn>=0.13.4" \
Expand Down
2 changes: 1 addition & 1 deletion docs/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dependencies:
- sphinx-tabs
- dask-sphinx-theme>=2.0.3
- dask>=2022.3.0
- pandas>=1.4.0,<2.0.0
- pandas>=1.4.0
- fugue>=0.7.3
# FIXME: handling is needed for httpx-based fastapi>=0.87.0
- fastapi>=0.69.0,<0.87.0
Expand Down
2 changes: 1 addition & 1 deletion docs/requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ sphinx>=4.0.0
sphinx-tabs
dask-sphinx-theme>=3.0.0
dask>=2022.3.0
pandas>=1.4.0,<2.0.0
pandas>=1.4.0
fugue>=0.7.3
# FIXME: handling is needed for httpx-based fastapi>=0.87.0
fastapi>=0.69.0,<0.87.0
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
install_requires=[
"dask[dataframe]>=2022.3.0",
"distributed>=2022.3.0",
"pandas>=1.4.0,<2.0.0",
"pandas>=1.4.0",
# FIXME: handling is needed for httpx-based fastapi>=0.87.0
"fastapi>=0.69.0,<0.87.0",
"uvicorn>=0.13.4",
Expand Down
21 changes: 15 additions & 6 deletions tests/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,21 @@ def gpu_string_table(string_table):

@pytest.fixture()
def gpu_datetime_table(datetime_table):
# cudf doesn't have support for timezoned datetime data
datetime_table["timezone"] = datetime_table["timezone"].astype("datetime64[ns]")
datetime_table["utc_timezone"] = datetime_table["utc_timezone"].astype(
"datetime64[ns]"
)
return cudf.from_pandas(datetime_table) if cudf else None
if cudf:
# TODO: remove once `from_pandas` has support for timezone-aware data
# https://github.com/rapidsai/cudf/issues/13611
df = datetime_table.copy()
df["timezone"] = df["timezone"].dt.tz_localize(None)
df["utc_timezone"] = df["utc_timezone"].dt.tz_localize(None)
gdf = cudf.from_pandas(df)
gdf["timezone"] = gdf["timezone"].dt.tz_localize(
str(datetime_table["timezone"].dt.tz)
)
gdf["utc_timezone"] = gdf["utc_timezone"].dt.tz_localize(
str(datetime_table["utc_timezone"].dt.tz)
)
return gdf
return None


@pytest.fixture()
Expand Down
24 changes: 11 additions & 13 deletions tests/integration/test_analyze.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dask.dataframe as dd
import pandas as pd

from dask_sql.mappings import python_to_sql_type
Expand All @@ -8,24 +9,21 @@ def test_analyze(c, df):
result_df = c.sql("ANALYZE TABLE df COMPUTE STATISTICS FOR ALL COLUMNS")

# extract table and compute stats with Dask manually
expected_df = (
c.sql("SELECT * FROM df")
.describe()
.append(
pd.Series(
expected_df = dd.concat(
[
c.sql("SELECT * FROM df").describe(),
pd.DataFrame(
{
col: str(python_to_sql_type(df[col].dtype)).lower()
for col in df.columns
},
name="data_type",
)
)
.append(
pd.Series(
index=["data_type"],
),
pd.DataFrame(
{col: col for col in df.columns},
name="col_name",
)
)
index=["col_name"],
),
]
)

assert_eq(result_df, expected_df)
Expand Down
6 changes: 4 additions & 2 deletions tests/integration/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ def test_filter_cast_date(c, input_table, request):
CAST(timezone AS DATE) > DATE '2014-08-01'
"""
)

expected_df = datetime_table[
datetime_table["timezone"].astype("<M8[ns]").dt.floor("D").astype("<M8[ns]")
datetime_table["timezone"].dt.tz_localize(None).dt.floor("D").astype("<M8[ns]")
> pd.Timestamp("2014-08-01")
]
assert_eq(return_df, expected_df)
Expand All @@ -110,6 +109,9 @@ def test_filter_cast_date(c, input_table, request):
),
],
)
@pytest.mark.xfail(
reason="Need support for non-UTC timezoned literals, see https://github.com/dask-contrib/dask-sql/issues/1193"
)
def test_filter_cast_timestamp(c, input_table, request):
datetime_table = request.getfixturevalue(input_table)
return_df = c.sql(
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/test_jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
def c():
c = Context()
c.create_schema(schema)
row = create_table_row()
tables = pd.DataFrame().append(row, ignore_index=True)
tables = pd.DataFrame(create_table_row(), index=[0])
tables = tables.astype({"AN_INT": "int64"})
c.create_table(table, tables, schema_name=schema)

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def test_join_cross(c, user_table_1, department_table):
user_table_1["key"] = 1
department_table["key"] = 1

expected_df = dd.merge(user_table_1, department_table, on="key").drop("key", 1)
expected_df = dd.merge(user_table_1, department_table, on="key").drop(columns="key")

assert_eq(return_df, expected_df, check_index=False)

Expand Down
15 changes: 11 additions & 4 deletions tests/integration/test_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dask.dataframe.optimize import optimize_dataframe_getitem
from dask.utils_test import hlg_layer

from dask_sql._compat import PANDAS_GT_200
from dask_sql.utils import ParsingException
from tests.utils import assert_eq

Expand Down Expand Up @@ -33,7 +34,10 @@ def test_select_column(c, df):
def test_select_different_types(c):
expected_df = pd.DataFrame(
{
"date": pd.to_datetime(["2022-01-21 17:34", "2022-01-21", "17:34", pd.NaT]),
"date": pd.to_datetime(
["2022-01-21 17:34", "2022-01-21", "17:34", pd.NaT],
format="mixed" if PANDAS_GT_200 else None,
),
"string": ["this is a test", "another test", "äölüć", ""],
"integer": [1, 2, -4, 5],
"float": [-1.1, np.NaN, pd.NA, np.sqrt(2)],
Expand Down Expand Up @@ -163,13 +167,13 @@ def test_date_casting(c, input_table, request):

expected_df = datetime_table
expected_df["timezone"] = (
expected_df["timezone"].astype("<M8[ns]").dt.floor("D").astype("<M8[ns]")
expected_df["timezone"].dt.tz_localize(None).dt.floor("D").astype("<M8[ns]")
)
expected_df["no_timezone"] = (
expected_df["no_timezone"].astype("<M8[ns]").dt.floor("D").astype("<M8[ns]")
)
expected_df["utc_timezone"] = (
expected_df["utc_timezone"].astype("<M8[ns]").dt.floor("D").astype("<M8[ns]")
expected_df["utc_timezone"].dt.tz_localize(None).dt.floor("D").astype("<M8[ns]")
)

assert_eq(result_df, expected_df)
Expand All @@ -194,7 +198,10 @@ def test_timestamp_casting(c, input_table, request):
"""
)

expected_df = datetime_table.astype("<M8[ns]")
expected_df = datetime_table
expected_df["timezone"] = expected_df["timezone"].dt.tz_localize(None)
expected_df["utc_timezone"] = expected_df["utc_timezone"].dt.tz_localize(None)

assert_eq(result_df, expected_df)


Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ def test_similar_type():
assert similar_type(pd.Int64Dtype(), np.int32)
assert not similar_type(np.uint32, np.int32)
assert similar_type(np.float32, np.float64)
assert similar_type(np.object_, str)
assert similar_type(object, str)

0 comments on commit 371315c

Please sign in to comment.