Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6398: Improved performance of list-like objects insertion into HDK DataFrames #6412

Merged
merged 6 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
build_categorical_from_at,
check_cols_to_join,
check_join_supported,
ensure_supported_dtype,
get_data_for_join_by_index,
maybe_range,
)
Expand Down Expand Up @@ -199,6 +200,9 @@
self.id = str(type(self)._next_id[0])
type(self)._next_id[0] += 1

if op is None and partitions is not None:
op = FrameNode(self)

self._op = op
self._index_cols = index_cols
self._partitions = partitions
Expand Down Expand Up @@ -252,6 +256,7 @@
op=no_default,
index_cols=no_default,
uses_rowid=no_default,
has_unsupported_data=no_default,
):
"""
Copy this DataFrame.
Expand All @@ -276,6 +281,8 @@
uses_rowid : bool, optional
True for frames which require access to the virtual 'rowid' column
for its execution.
has_unsupported_data : bool, optional
True for frames holding data not supported by Arrow or HDK storage format.

Returns
-------
Expand All @@ -296,6 +303,8 @@
index_cols = self._index_cols
if uses_rowid is no_default:
uses_rowid = self._uses_rowid
if has_unsupported_data is no_default:
has_unsupported_data = self._has_unsupported_data
return self.__constructor__(
partitions=partitions,
index=index,
Expand All @@ -307,7 +316,7 @@
index_cols=index_cols,
uses_rowid=uses_rowid,
force_execution_mode=self._force_execution_mode,
has_unsupported_data=self._has_unsupported_data,
has_unsupported_data=has_unsupported_data,
)

def id_str(self):
Expand Down Expand Up @@ -482,9 +491,7 @@
-------
bool
"""
return self._partitions is not None and isinstance(
self._partitions[0][0].get(), pyarrow.Table
)
return self._partitions is not None and self._partitions[0][0].raw

def _dtypes_for_exprs(self, exprs):
"""
Expand Down Expand Up @@ -1433,7 +1440,7 @@
lhs = lhs._reset_index_names()

if ignore_index:
new_columns = Index.__new__(RangeIndex, data=range(len(lhs.columns)))
new_columns = RangeIndex(range(len(lhs.columns)))
lhs = lhs._set_columns(new_columns)

return lhs
Expand Down Expand Up @@ -1461,14 +1468,7 @@
and isinstance(f._execute(), (DbTable, pyarrow.Table))
for f in frames
):
tables = [
(
t
if isinstance(t := f._partitions[0][0].get(), pyarrow.Table)
else t.to_arrow()
)
for f in frames
]
tables = [f._partitions[0][0].get(to_arrow=True) for f in frames]
column_names = [c for t in tables for c in t.column_names]
if len(column_names) != len(set(column_names)):
raise NotImplementedError("Duplicate column names")
Expand Down Expand Up @@ -1676,6 +1676,13 @@
assert column not in self._table_cols
assert 0 <= loc <= len(self.columns)

if is_list_like(value):
if isinstance(value, pd.Series) and not self.index.equals(value.index):
# Align by index
value = value.reindex(self.index)
value.reset_index(drop=True, inplace=True)
return self._insert_list(loc, column, value)

exprs = self._index_exprs()
for i in range(0, loc):
col = self.columns[i]
Expand All @@ -1696,6 +1703,171 @@
force_execution_mode=self._force_execution_mode,
)

def _insert_list(self, loc, name, value):
"""
Insert a list-like value.

Parameters
----------
loc : int
name : str
value : list

Returns
-------
HdkOnNativeDataframe
"""
ncols = len(self.columns)

if loc == -1:
loc = ncols

if ncols == 0:
assert loc == 0
return self._list_to_df(name, value, True)

Check warning on line 1727 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1727

Added line #L1727 was not covered by tests

if self._partitions and self._partitions[0][0].raw:
return self._insert_list_col(loc, name, value)

if loc == 0 or loc == ncols:
in_idx = 0 if loc == 0 else 1
if (
isinstance(self._op, JoinNode)
and self._op.by_rowid
and self._op.input[in_idx]._partitions
and self._op.input[in_idx]._partitions[0][0].raw
):
lhs = self._op.input[0]
rhs = self._op.input[1]
if loc == 0:
lhs = lhs._insert_list(0, name, value)
dtype = lhs.dtypes[0]
else:
rhs = rhs._insert_list(-1, name, value)
dtype = rhs.dtypes[-1]
elif loc == 0:
lhs = self._list_to_df(name, value, False)
rhs = self
dtype = lhs.dtypes[0]
else:
lhs = self
rhs = self._list_to_df(name, value, False)
dtype = rhs.dtypes[0]
elif isinstance(self._op, JoinNode) and self._op.by_rowid:
left_len = len(self._op.input[0].columns)
if loc < left_len:
lhs = self._op.input[0]._insert_list(loc, name, value)
rhs = self._op.input[1]
dtype = lhs.dtypes[loc]
else:
lhs = self._op.input[0]
rhs = self._op.input[1]._insert_list(loc - left_len, name, value)
dtype = rhs.dtypes[loc]
else:
lexprs = self._index_exprs()
rexprs = {}
for i, col in enumerate(self.columns):
(lexprs if i < loc else rexprs)[col] = self.ref(col)
lhs = self.__constructor__(
columns=self.columns[0:loc],
dtypes=self._dtypes_for_exprs(lexprs),
op=TransformNode(self, lexprs),
index=self._index_cache,
index_cols=self._index_cols,
force_execution_mode=self._force_execution_mode,
)._insert_list(loc, name, value)
rhs = self.__constructor__(
columns=self.columns[loc:],
dtypes=self._dtypes_for_exprs(rexprs),
op=TransformNode(self, rexprs),
force_execution_mode=self._force_execution_mode,
)
dtype = lhs.dtypes[loc]

op = self._join_by_rowid_op(lhs, rhs)
return self._insert_list_col(loc, name, value, dtype, op)

def _insert_list_col(self, idx, name, value, dtype=None, op=None):
"""
Insert a list-like column.

Parameters
----------
idx : int
name : str
value : list
dtype : dtype, default: None
op : DFAlgNode, default: None

Returns
-------
HdkOnNativeDataframe
"""
cols = self.columns.tolist()
cols.insert(idx, name)
has_unsupported_data = self._has_unsupported_data
if self._index_cols:
idx += len(self._index_cols)
if dtype is None:
part, dtype = self._partitions[0][0].insert(idx, name, value)
part = np.array([[part]])
if not has_unsupported_data:
try:
ensure_supported_dtype(dtype)
except NotImplementedError:
has_unsupported_data = True

Check warning on line 1818 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1817-L1818

Added lines #L1817 - L1818 were not covered by tests
else:
part = None
dtypes = self._dtypes.tolist()
dtypes.insert(idx, dtype)
return self.copy(
partitions=part,
columns=cols,
dtypes=dtypes,
op=op,
has_unsupported_data=has_unsupported_data,
)

def _list_to_df(self, name, value, add_index):
"""
Create a single-column frame from the list-like value.

Parameters
----------
name : str
value : list
add_index : bool

Returns
-------
HdkOnNativeDataframe
"""
df = pd.DataFrame({name: value}, index=self.index if add_index else None)
ensure_supported_dtype(df.dtypes[0])
return self.from_pandas(df)

@staticmethod
def _join_by_rowid_op(lhs, rhs):
"""
Create a JoinNode for join by rowid.

Parameters
----------
lhs : HdkOnNativeDataframe
rhs : HdkOnNativeDataframe

Returns
-------
JoinNode
"""
exprs = lhs._index_exprs() if lhs._index_cols else rhs._index_exprs()
exprs.update((c, lhs.ref(c)) for c in lhs.columns)
exprs.update((c, rhs.ref(c)) for c in rhs.columns)
condition = lhs._build_equi_join_condition(
rhs, [ROWID_COL_NAME], [ROWID_COL_NAME]
)
return JoinNode(lhs, rhs, exprs=exprs, condition=condition)

def cat_codes(self):
"""
Extract codes for a category column.
Expand Down Expand Up @@ -2193,7 +2365,7 @@
return (cols, [len(cols)])

if self._index_cols is None:
index = Index.__new__(RangeIndex, data=range(len(obj)))
index = RangeIndex(range(len(obj)))
return (index, [len(index)])
if isinstance(obj, DbTable):
# TODO: Get the index columns only
Expand All @@ -2217,8 +2389,12 @@

def _build_index_cache(self):
"""Materialize index and store it in the cache."""
index, _ = self._compute_axis_labels_and_lengths(axis=0)
self.set_index_cache(index)
if self._partitions and not self._index_cols:
nrows = self._partitions[0][0]._length_cache
self.set_index_cache(RangeIndex(range(nrows)))

Check warning on line 2394 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L2392-L2394

Added lines #L2392 - L2394 were not covered by tests
else:
index, _ = self._compute_axis_labels_and_lengths(axis=0)
self.set_index_cache(index)

Check warning on line 2397 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L2396-L2397

Added lines #L2396 - L2397 were not covered by tests

def _get_index(self):
"""
Expand Down Expand Up @@ -2666,8 +2842,8 @@
assert len(df.columns) == len(self.columns)
else:
assert self._index_cols is None
assert df.index.name is None or isinstance(
self._partitions[0][0].get(), pd.DataFrame
assert (
df.index.name is None or self._has_unsupported_data
), f"index name '{df.index.name}' is not None"
if self.has_materialized_index:
df.index = self._index_cache.get().copy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,52 @@
return pa.from_numpy_dtype(np.promote_types(t1, t2))


def is_supported_arrow_type(dtype: pa.lib.DataType) -> bool:
"""
Return True if the specified arrow type is supported by HDK.

Parameters
----------
dtype : pa.lib.DataType

Returns
-------
bool
"""
if (
pa.types.is_string(dtype)
or pa.types.is_time(dtype)
or pa.types.is_dictionary(dtype)
or pa.types.is_null(dtype)
):
return True
if isinstance(dtype, pa.ExtensionType) or pa.types.is_duration(dtype):
return False
try:
pandas_dtype = dtype.to_pandas_dtype()
return pandas_dtype != np.dtype("O")
except NotImplementedError:
return False

Check warning on line 559 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py#L558-L559

Added lines #L558 - L559 were not covered by tests


def ensure_supported_dtype(dtype):
"""
Check if the specified `dtype` is supported by HDK.

If `dtype` is not supported, `NotImplementedError` is raised.

Parameters
----------
dtype : dtype
"""
try:
dtype = pa.from_numpy_dtype(dtype)
except pa.ArrowNotImplementedError as err:
raise NotImplementedError(f"Type {dtype}") from err
if not is_supported_arrow_type(dtype):
raise NotImplementedError(f"Type {dtype}")


def arrow_to_pandas(at: pa.Table) -> pandas.DataFrame:
"""
Convert the specified arrow table to pandas.
Expand Down
Loading
Loading