Skip to content

Commit

Permalink
[SPARK-37903][PYTHON] Replace string_typehints with get_type_hints
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Replaces `string_typehints` with `get_type_hints`.

### Why are the changes needed?

Currently we have a hacky way to resolve type hints written as strings, but we can use `get_type_hints` instead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #35200 from ueshin/issues/SPARK-37903/string_typehints.

Authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
ueshin authored and HyukjinKwon committed Jan 14, 2022
1 parent db98074 commit 31d8489
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 205 deletions.
64 changes: 22 additions & 42 deletions python/pyspark/pandas/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
"""
Date/Time related functions on pandas-on-Spark Series
"""
from typing import Any, Optional, Union, TYPE_CHECKING, no_type_check
from typing import Any, Optional, Union, no_type_check

import numpy as np
import pandas as pd # noqa: F401
from pandas.tseries.offsets import DateOffset

import pyspark.pandas as ps
import pyspark.sql.functions as F
from pyspark.sql.types import DateType, TimestampType, TimestampNTZType, LongType

if TYPE_CHECKING:
import pyspark.pandas as ps


class DatetimeMethods:
"""Date/Time methods for pandas-on-Spark Series"""
Expand Down Expand Up @@ -107,8 +106,7 @@ def microsecond(self) -> "ps.Series":
The microseconds of the datetime.
"""

@no_type_check
def pandas_microsecond(s) -> "ps.Series[np.int64]":
def pandas_microsecond(s) -> ps.Series[np.int64]: # type: ignore[no-untyped-def]
return s.dt.microsecond

return self._data.pandas_on_spark.transform_batch(pandas_microsecond)
Expand Down Expand Up @@ -167,8 +165,7 @@ def dayofweek(self) -> "ps.Series":
dtype: int64
"""

@no_type_check
def pandas_dayofweek(s) -> "ps.Series[np.int64]":
def pandas_dayofweek(s) -> ps.Series[np.int64]: # type: ignore[no-untyped-def]
return s.dt.dayofweek

return self._data.pandas_on_spark.transform_batch(pandas_dayofweek)
Expand All @@ -185,8 +182,7 @@ def dayofyear(self) -> "ps.Series":
The ordinal day of the year.
"""

@no_type_check
def pandas_dayofyear(s) -> "ps.Series[np.int64]":
def pandas_dayofyear(s) -> ps.Series[np.int64]: # type: ignore[no-untyped-def]
return s.dt.dayofyear

return self._data.pandas_on_spark.transform_batch(pandas_dayofyear)
Expand All @@ -197,8 +193,7 @@ def quarter(self) -> "ps.Series":
The quarter of the date.
"""

@no_type_check
def pandas_quarter(s) -> "ps.Series[np.int64]":
def pandas_quarter(s) -> ps.Series[np.int64]: # type: ignore[no-untyped-def]
return s.dt.quarter

return self._data.pandas_on_spark.transform_batch(pandas_quarter)
Expand Down Expand Up @@ -237,8 +232,7 @@ def is_month_start(self) -> "ps.Series":
dtype: bool
"""

@no_type_check
def pandas_is_month_start(s) -> "ps.Series[bool]":
def pandas_is_month_start(s) -> ps.Series[bool]: # type: ignore[no-untyped-def]
return s.dt.is_month_start

return self._data.pandas_on_spark.transform_batch(pandas_is_month_start)
Expand Down Expand Up @@ -277,8 +271,7 @@ def is_month_end(self) -> "ps.Series":
dtype: bool
"""

@no_type_check
def pandas_is_month_end(s) -> "ps.Series[bool]":
def pandas_is_month_end(s) -> ps.Series[bool]: # type: ignore[no-untyped-def]
return s.dt.is_month_end

return self._data.pandas_on_spark.transform_batch(pandas_is_month_end)
Expand Down Expand Up @@ -328,8 +321,7 @@ def is_quarter_start(self) -> "ps.Series":
Name: dates, dtype: bool
"""

@no_type_check
def pandas_is_quarter_start(s) -> "ps.Series[bool]":
def pandas_is_quarter_start(s) -> ps.Series[bool]: # type: ignore[no-untyped-def]
return s.dt.is_quarter_start

return self._data.pandas_on_spark.transform_batch(pandas_is_quarter_start)
Expand Down Expand Up @@ -379,8 +371,7 @@ def is_quarter_end(self) -> "ps.Series":
Name: dates, dtype: bool
"""

@no_type_check
def pandas_is_quarter_end(s) -> "ps.Series[bool]":
def pandas_is_quarter_end(s) -> ps.Series[bool]: # type: ignore[no-untyped-def]
return s.dt.is_quarter_end

return self._data.pandas_on_spark.transform_batch(pandas_is_quarter_end)
Expand Down Expand Up @@ -419,8 +410,7 @@ def is_year_start(self) -> "ps.Series":
dtype: bool
"""

@no_type_check
def pandas_is_year_start(s) -> "ps.Series[bool]":
def pandas_is_year_start(s) -> ps.Series[bool]: # type: ignore[no-untyped-def]
return s.dt.is_year_start

return self._data.pandas_on_spark.transform_batch(pandas_is_year_start)
Expand Down Expand Up @@ -459,8 +449,7 @@ def is_year_end(self) -> "ps.Series":
dtype: bool
"""

@no_type_check
def pandas_is_year_end(s) -> "ps.Series[bool]":
def pandas_is_year_end(s) -> ps.Series[bool]: # type: ignore[no-untyped-def]
return s.dt.is_year_end

return self._data.pandas_on_spark.transform_batch(pandas_is_year_end)
Expand Down Expand Up @@ -499,8 +488,7 @@ def is_leap_year(self) -> "ps.Series":
dtype: bool
"""

@no_type_check
def pandas_is_leap_year(s) -> "ps.Series[bool]":
def pandas_is_leap_year(s) -> ps.Series[bool]: # type: ignore[no-untyped-def]
return s.dt.is_leap_year

return self._data.pandas_on_spark.transform_batch(pandas_is_leap_year)
Expand All @@ -511,8 +499,7 @@ def daysinmonth(self) -> "ps.Series":
The number of days in the month.
"""

@no_type_check
def pandas_daysinmonth(s) -> "ps.Series[np.int64]":
def pandas_daysinmonth(s) -> ps.Series[np.int64]: # type: ignore[no-untyped-def]
return s.dt.daysinmonth

return self._data.pandas_on_spark.transform_batch(pandas_daysinmonth)
Expand Down Expand Up @@ -574,8 +561,7 @@ def normalize(self) -> "ps.Series":
dtype: datetime64[ns]
"""

@no_type_check
def pandas_normalize(s) -> "ps.Series[np.datetime64]":
def pandas_normalize(s) -> ps.Series[np.datetime64]: # type: ignore[no-untyped-def]
return s.dt.normalize()

return self._data.pandas_on_spark.transform_batch(pandas_normalize)
Expand Down Expand Up @@ -623,8 +609,7 @@ def strftime(self, date_format: str) -> "ps.Series":
dtype: object
"""

@no_type_check
def pandas_strftime(s) -> "ps.Series[str]":
def pandas_strftime(s) -> ps.Series[str]: # type: ignore[no-untyped-def]
return s.dt.strftime(date_format)

return self._data.pandas_on_spark.transform_batch(pandas_strftime)
Expand Down Expand Up @@ -679,8 +664,7 @@ def round(self, freq: Union[str, DateOffset], *args: Any, **kwargs: Any) -> "ps.
dtype: datetime64[ns]
"""

@no_type_check
def pandas_round(s) -> "ps.Series[np.datetime64]":
def pandas_round(s) -> ps.Series[np.datetime64]: # type: ignore[no-untyped-def]
return s.dt.round(freq, *args, **kwargs)

return self._data.pandas_on_spark.transform_batch(pandas_round)
Expand Down Expand Up @@ -735,8 +719,7 @@ def floor(self, freq: Union[str, DateOffset], *args: Any, **kwargs: Any) -> "ps.
dtype: datetime64[ns]
"""

@no_type_check
def pandas_floor(s) -> "ps.Series[np.datetime64]":
def pandas_floor(s) -> ps.Series[np.datetime64]: # type: ignore[no-untyped-def]
return s.dt.floor(freq, *args, **kwargs)

return self._data.pandas_on_spark.transform_batch(pandas_floor)
Expand Down Expand Up @@ -791,8 +774,7 @@ def ceil(self, freq: Union[str, DateOffset], *args: Any, **kwargs: Any) -> "ps.S
dtype: datetime64[ns]
"""

@no_type_check
def pandas_ceil(s) -> "ps.Series[np.datetime64]":
def pandas_ceil(s) -> ps.Series[np.datetime64]: # type: ignore[no-untyped-def]
return s.dt.ceil(freq, *args, **kwargs)

return self._data.pandas_on_spark.transform_batch(pandas_ceil)
Expand Down Expand Up @@ -828,8 +810,7 @@ def month_name(self, locale: Optional[str] = None) -> "ps.Series":
dtype: object
"""

@no_type_check
def pandas_month_name(s) -> "ps.Series[str]":
def pandas_month_name(s) -> ps.Series[str]: # type: ignore[no-untyped-def]
return s.dt.month_name(locale=locale)

return self._data.pandas_on_spark.transform_batch(pandas_month_name)
Expand Down Expand Up @@ -865,8 +846,7 @@ def day_name(self, locale: Optional[str] = None) -> "ps.Series":
dtype: object
"""

@no_type_check
def pandas_day_name(s) -> "ps.Series[str]":
def pandas_day_name(s) -> ps.Series[str]: # type: ignore[no-untyped-def]
return s.dt.day_name(locale=locale)

return self._data.pandas_on_spark.transform_batch(pandas_day_name)
Expand Down
13 changes: 7 additions & 6 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3028,8 +3028,9 @@ def between_time(
psdf.index.name = verify_temp_column_name(psdf, "__index_name__")
return_types = [psdf.index.dtype] + list(psdf.dtypes)

@no_type_check
def pandas_between_time(pdf) -> ps.DataFrame[return_types]:
def pandas_between_time( # type: ignore[no-untyped-def]
pdf,
) -> ps.DataFrame[return_types]: # type: ignore[valid-type]
return pdf.between_time(start_time, end_time, include_start, include_end).reset_index()

# apply_batch will remove the index of the pandas-on-Spark DataFrame and attach a
Expand Down Expand Up @@ -3106,8 +3107,9 @@ def at_time(
psdf.index.name = verify_temp_column_name(psdf, "__index_name__")
return_types = [psdf.index.dtype] + list(psdf.dtypes)

@no_type_check
def pandas_at_time(pdf) -> ps.DataFrame[return_types]:
def pandas_at_time( # type: ignore[no-untyped-def]
pdf,
) -> ps.DataFrame[return_types]: # type: ignore[valid-type]
return pdf.at_time(time, asof, axis).reset_index()

# apply_batch will remove the index of the pandas-on-Spark DataFrame and attach
Expand Down Expand Up @@ -11645,8 +11647,7 @@ def eval(self, expr: str, inplace: bool = False) -> Optional[DataFrameOrSeries]:
# Since `eval_func` doesn't have a type hint, inferring the schema is always preformed
# in the `apply_batch`. Hence, the variables `should_return_series`, `series_name`,
# and `should_return_scalar` can be updated.
@no_type_check
def eval_func(pdf):
def eval_func(pdf): # type: ignore[no-untyped-def]
nonlocal should_return_series
nonlocal series_name
nonlocal should_return_scalar
Expand Down
6 changes: 2 additions & 4 deletions python/pyspark/pandas/indexes/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,8 +682,7 @@ def indexer_between_time(
Int64Index([2], dtype='int64')
"""

@no_type_check
def pandas_between_time(pdf) -> ps.DataFrame[int]:
def pandas_between_time(pdf) -> ps.DataFrame[int]: # type: ignore[no-untyped-def]
return pdf.between_time(start_time, end_time, include_start, include_end)

psdf = self.to_frame()[[]]
Expand Down Expand Up @@ -728,8 +727,7 @@ def indexer_at_time(self, time: Union[datetime.time, str], asof: bool = False) -
if asof:
raise NotImplementedError("'asof' argument is not supported")

@no_type_check
def pandas_at_time(pdf) -> ps.DataFrame[int]:
def pandas_at_time(pdf) -> ps.DataFrame[int]: # type: ignore[no-untyped-def]
return pdf.at_time(time, asof)

psdf = self.to_frame()[[]]
Expand Down
3 changes: 1 addition & 2 deletions python/pyspark/pandas/indexes/timedelta.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ def days(self) -> Index:
Number of days for each element.
"""

@no_type_check
def pandas_days(x) -> int:
def pandas_days(x) -> int: # type: ignore[no-untyped-def]
return x.days

return Index(self.to_series().transform(pandas_days))
Expand Down
5 changes: 3 additions & 2 deletions python/pyspark/pandas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,9 @@ def read_parquet(
if index_col is None and pandas_metadata:
# Try to read pandas metadata

@no_type_check
@pandas_udf("index_col array<string>, index_names array<string>")
@pandas_udf( # type: ignore[call-overload]
"index_col array<string>, index_names array<string>"
)
def read_index_metadata(pser: pd.Series) -> pd.DataFrame:
binary = pser.iloc[0]
metadata = pq.ParquetFile(pa.BufferReader(binary)).metadata.metadata
Expand Down
Loading

0 comments on commit 31d8489

Please sign in to comment.