Skip to content

Commit

Permalink
BUG/REF: ArrowExtensionArray non-nanosecond units (#53171)
Browse files Browse the repository at this point in the history
* BUG/REF: ArrowExtensionArray non-nanosecond units

* mypy

* gh refs

* fixes

* xfail min versions

* docstrings

* fix test

* fix test

* update imports

* move imports
  • Loading branch information
lukemanley authored May 16, 2023
1 parent 489f15e commit c96dbb7
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 79 deletions.
1 change: 1 addition & 0 deletions doc/source/whatsnew/v2.1.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ Sparse

ExtensionArray
^^^^^^^^^^^^^^
- Bug in :class:`~arrays.ArrowExtensionArray` converting pandas non-nanosecond temporal objects from non-zero values to zero values (:issue:`53171`)
- Bug in :meth:`Series.quantile` for pyarrow temporal types raising ArrowInvalid (:issue:`52678`)
- Bug in :meth:`Series.rank` returning wrong order for small values with ``Float64`` dtype (:issue:`52471`)
- Bug where the ``__from_arrow__`` method of masked ExtensionDtypes(e.g. :class:`Float64Dtype`, :class:`BooleanDtype`) would not accept pyarrow arrays of type ``pyarrow.null()`` (:issue:`52223`)
Expand Down
249 changes: 172 additions & 77 deletions pandas/core/arrays/arrow/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import numpy as np

from pandas._libs import lib
from pandas._libs.tslibs import (
Timedelta,
Timestamp,
)
from pandas.compat import (
pa_version_under7p0,
pa_version_under8p0,
Expand Down Expand Up @@ -244,39 +248,9 @@ def _from_sequence(cls, scalars, *, dtype: Dtype | None = None, copy: bool = Fal
"""
Construct a new ExtensionArray from a sequence of scalars.
"""
pa_dtype = to_pyarrow_type(dtype)
if (
isinstance(scalars, np.ndarray)
and isinstance(dtype, ArrowDtype)
and (
pa.types.is_large_binary(pa_dtype) or pa.types.is_large_string(pa_dtype)
)
):
# See https://github.com/apache/arrow/issues/35289
scalars = scalars.tolist()

if isinstance(scalars, cls):
scalars = scalars._pa_array
elif not isinstance(scalars, (pa.Array, pa.ChunkedArray)):
if copy and is_array_like(scalars):
# pa array should not get updated when numpy array is updated
scalars = scalars.copy()
try:
scalars = pa.array(scalars, type=pa_dtype, from_pandas=True)
except pa.ArrowInvalid:
# GH50430: let pyarrow infer type, then cast
scalars = pa.array(scalars, from_pandas=True)
if pa_dtype and scalars.type != pa_dtype:
if pa.types.is_dictionary(pa_dtype):
scalars = scalars.dictionary_encode()
else:
scalars = scalars.cast(pa_dtype)
arr = cls(scalars)
if pa.types.is_duration(scalars.type) and scalars.null_count > 0:
# GH52843: upstream bug for duration types when originally
# constructed with data containing numpy NaT.
# https://github.com/apache/arrow/issues/35088
arr = arr.fillna(arr.dtype.na_value)
pa_type = to_pyarrow_type(dtype)
pa_array = cls._box_pa_array(scalars, pa_type=pa_type, copy=copy)
arr = cls(pa_array)
return arr

@classmethod
Expand Down Expand Up @@ -352,6 +326,150 @@ def _from_sequence_of_strings(
)
return cls._from_sequence(scalars, dtype=pa_type, copy=copy)

@classmethod
def _box_pa(
cls, value, pa_type: pa.DataType | None = None
) -> pa.Array | pa.ChunkedArray | pa.Scalar:
"""
Box value into a pyarrow Array, ChunkedArray or Scalar.
Parameters
----------
value : any
pa_type : pa.DataType | None
Returns
-------
pa.Array or pa.ChunkedArray or pa.Scalar
"""
if is_list_like(value):
return cls._box_pa_array(value, pa_type)
return cls._box_pa_scalar(value, pa_type)

@classmethod
def _box_pa_scalar(cls, value, pa_type: pa.DataType | None = None) -> pa.Scalar:
"""
Box value into a pyarrow Scalar.
Parameters
----------
value : any
pa_type : pa.DataType | None
Returns
-------
pa.Scalar
"""
if isinstance(value, pa.Scalar):
pa_scalar = value
elif isna(value):
pa_scalar = pa.scalar(None, type=pa_type)
else:
# GH 53171: pyarrow does not yet handle pandas non-nano correctly
# see https://github.com/apache/arrow/issues/33321
if isinstance(value, Timedelta):
if pa_type is None:
pa_type = pa.duration(value.unit)
elif value.unit != pa_type.unit:
value = value.as_unit(pa_type.unit)
value = value._value
elif isinstance(value, Timestamp):
if pa_type is None:
pa_type = pa.timestamp(value.unit, tz=value.tz)
elif value.unit != pa_type.unit:
value = value.as_unit(pa_type.unit)
value = value._value

pa_scalar = pa.scalar(value, type=pa_type, from_pandas=True)

if pa_type is not None and pa_scalar.type != pa_type:
pa_scalar = pa_scalar.cast(pa_type)

return pa_scalar

@classmethod
def _box_pa_array(
cls, value, pa_type: pa.DataType | None = None, copy: bool = False
) -> pa.Array | pa.ChunkedArray:
"""
Box value into a pyarrow Array or ChunkedArray.
Parameters
----------
value : Sequence
pa_type : pa.DataType | None
Returns
-------
pa.Array or pa.ChunkedArray
"""
if isinstance(value, cls):
pa_array = value._pa_array
elif isinstance(value, (pa.Array, pa.ChunkedArray)):
pa_array = value
elif isinstance(value, BaseMaskedArray):
# GH 52625
if copy:
value = value.copy()
pa_array = value.__arrow_array__()
else:
if (
isinstance(value, np.ndarray)
and pa_type is not None
and (
pa.types.is_large_binary(pa_type)
or pa.types.is_large_string(pa_type)
)
):
# See https://github.com/apache/arrow/issues/35289
value = value.tolist()
elif copy and is_array_like(value):
# pa array should not get updated when numpy array is updated
value = value.copy()

if (
pa_type is not None
and pa.types.is_duration(pa_type)
and (not isinstance(value, np.ndarray) or value.dtype.kind not in "mi")
):
# GH 53171: pyarrow does not yet handle pandas non-nano correctly
# see https://github.com/apache/arrow/issues/33321
from pandas.core.tools.timedeltas import to_timedelta

value = to_timedelta(value, unit=pa_type.unit).as_unit(pa_type.unit)
value = value.to_numpy()

try:
pa_array = pa.array(value, type=pa_type, from_pandas=True)
except pa.ArrowInvalid:
# GH50430: let pyarrow infer type, then cast
pa_array = pa.array(value, from_pandas=True)

if pa_type is None and pa.types.is_duration(pa_array.type):
# GH 53171: pyarrow does not yet handle pandas non-nano correctly
# see https://github.com/apache/arrow/issues/33321
from pandas.core.tools.timedeltas import to_timedelta

value = to_timedelta(value)
value = value.to_numpy()
pa_array = pa.array(value, type=pa_type, from_pandas=True)

if pa.types.is_duration(pa_array.type) and pa_array.null_count > 0:
# GH52843: upstream bug for duration types when originally
# constructed with data containing numpy NaT.
# https://github.com/apache/arrow/issues/35088
arr = cls(pa_array)
arr = arr.fillna(arr.dtype.na_value)
pa_array = arr._pa_array

if pa_type is not None and pa_array.type != pa_type:
if pa.types.is_dictionary(pa_type):
pa_array = pa_array.dictionary_encode()
else:
pa_array = pa_array.cast(pa_type)

return pa_array

def __getitem__(self, item: PositionalIndexer):
"""Select a subset of self.
Expand Down Expand Up @@ -470,65 +588,50 @@ def __setstate__(self, state) -> None:

def _cmp_method(self, other, op):
pc_func = ARROW_CMP_FUNCS[op.__name__]
if isinstance(other, ArrowExtensionArray):
result = pc_func(self._pa_array, other._pa_array)
elif isinstance(other, (np.ndarray, list)):
result = pc_func(self._pa_array, other)
elif isinstance(other, BaseMaskedArray):
# GH 52625
result = pc_func(self._pa_array, other.__arrow_array__())
elif is_scalar(other):
try:
result = pc_func(self._pa_array, pa.scalar(other))
except (pa.lib.ArrowNotImplementedError, pa.lib.ArrowInvalid):
try:
result = pc_func(self._pa_array, self._box_pa(other))
except (pa.lib.ArrowNotImplementedError, pa.lib.ArrowInvalid):
if is_scalar(other):
mask = isna(self) | isna(other)
valid = ~mask
result = np.zeros(len(self), dtype="bool")
result[valid] = op(np.array(self)[valid], other)
result = pa.array(result, type=pa.bool_())
result = pc.if_else(valid, result, None)
else:
raise NotImplementedError(
f"{op.__name__} not implemented for {type(other)}"
)
else:
raise NotImplementedError(
f"{op.__name__} not implemented for {type(other)}"
)
return ArrowExtensionArray(result)

def _evaluate_op_method(self, other, op, arrow_funcs):
pa_type = self._pa_array.type
other = self._box_pa(other)

if (pa.types.is_string(pa_type) or pa.types.is_binary(pa_type)) and op in [
operator.add,
roperator.radd,
]:
sep = pa.scalar("", type=pa_type)
if isinstance(other, type(self)):
other = other._pa_array
if op is operator.add:
result = pc.binary_join_element_wise(self._pa_array, other, sep)
else:
result = pc.binary_join_element_wise(other, self._pa_array, sep)
return type(self)(result)

if (
isinstance(other, pa.Scalar)
and pc.is_null(other).as_py()
and op.__name__ in ARROW_LOGICAL_FUNCS
):
# pyarrow kleene ops require null to be typed
other = other.cast(pa_type)

pc_func = arrow_funcs[op.__name__]
if pc_func is NotImplemented:
raise NotImplementedError(f"{op.__name__} not implemented.")
if isinstance(other, ArrowExtensionArray):
result = pc_func(self._pa_array, other._pa_array)
elif isinstance(other, (np.ndarray, list)):
result = pc_func(self._pa_array, pa.array(other, from_pandas=True))
elif isinstance(other, BaseMaskedArray):
# GH 52625
result = pc_func(self._pa_array, other.__arrow_array__())
elif is_scalar(other):
if isna(other) and op.__name__ in ARROW_LOGICAL_FUNCS:
# pyarrow kleene ops require null to be typed
pa_scalar = pa.scalar(None, type=self._pa_array.type)
else:
pa_scalar = pa.scalar(other)
result = pc_func(self._pa_array, pa_scalar)
else:
raise NotImplementedError(
f"{op.__name__} not implemented for {type(other)}"
)

result = pc_func(self._pa_array, other)
return type(self)(result)

def _logical_method(self, other, op):
Expand Down Expand Up @@ -1610,16 +1713,8 @@ def _mode(self, dropna: bool = True) -> Self:

def _maybe_convert_setitem_value(self, value):
"""Maybe convert value to be pyarrow compatible."""
if value is None:
return value
if isinstance(value, (pa.Scalar, pa.Array, pa.ChunkedArray)):
return value
if is_list_like(value):
pa_box = pa.array
else:
pa_box = pa.scalar
try:
value = pa_box(value, type=self._pa_array.type, from_pandas=True)
value = self._box_pa(value, self._pa_array.type)
except pa.ArrowTypeError as err:
msg = f"Invalid value '{str(value)}' for dtype {self.dtype}"
raise TypeError(msg) from err
Expand Down
Loading

0 comments on commit c96dbb7

Please sign in to comment.