Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#4725: Make index and columns lazy in Modin DataFrame #4726

Merged
merged 5 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Key Features and Updates
* FEAT-#4419: Extend virtual partitioning API to pandas on Dask (#4420)
* FEAT-#4147: Add partial compatibility with Python 3.6 and pandas 1.1 (#4301)
* FEAT-#4569: Add error message when `read_` function defaults to pandas (#4647)
* FEAT-#4725: Make index and columns lazy in Modin DataFrame (#4726)

Contributors
------------
Expand Down
187 changes: 111 additions & 76 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,12 @@ class PandasDataframe(ClassLogger):
----------
partitions : np.ndarray
A 2D NumPy array of partitions.
index : sequence
index : sequence, optional
The index for the dataframe. Converted to a ``pandas.Index``.
columns : sequence
Is computed from partitions on demand if not specified.
columns : sequence, optional
The columns object for the dataframe. Converted to a ``pandas.Index``.
Is computed from partitions on demand if not specified.
row_lengths : list, optional
The length of each partition in the rows. The "height" of
each of the block partitions. Is computed if not provided.
Expand Down Expand Up @@ -177,49 +179,52 @@ def __constructor__(self):
def __init__(
self,
partitions,
index,
columns,
index=None,
columns=None,
row_lengths=None,
column_widths=None,
dtypes=None,
):
self._partitions = partitions
self._index_cache = ensure_index(index)
self._columns_cache = ensure_index(columns)
if row_lengths is not None and len(self.index) > 0:
self._index_cache = ensure_index(index) if index is not None else None
self._columns_cache = ensure_index(columns) if columns is not None else None
self._row_lengths_cache = row_lengths
self._column_widths_cache = column_widths
YarShev marked this conversation as resolved.
Show resolved Hide resolved
self._dtypes = dtypes

self._validate_axes_splits()
self._filter_empties(compute_metadata=False)

def _validate_axes_splits(self):
vnlitvinov marked this conversation as resolved.
Show resolved Hide resolved
"""Validate that labels are split correctly if split is known."""
YarShev marked this conversation as resolved.
Show resolved Hide resolved
if self._row_lengths_cache is not None and len(self.index) > 0:
# An empty frame can have 0 rows but a nonempty index. If the frame
# does have rows, the number of rows must equal the size of the
# index.
num_rows = sum(row_lengths)
num_rows = sum(self._row_lengths_cache)
if num_rows > 0:
ErrorMessage.catch_bugs_and_request_email(
num_rows != len(self._index_cache),
"Row lengths: {} != {}".format(num_rows, len(self._index_cache)),
f"Row lengths: {num_rows} != {len(self._index_cache)}",
)
ErrorMessage.catch_bugs_and_request_email(
any(val < 0 for val in row_lengths),
"Row lengths cannot be negative: {}".format(row_lengths),
any(val < 0 for val in self._row_lengths_cache),
f"Row lengths cannot be negative: {self._row_lengths_cache}",
)
self._row_lengths_cache = row_lengths
if column_widths is not None and len(self.columns) > 0:
if self._column_widths_cache is not None and len(self.columns) > 0:
# An empty frame can have 0 column but a nonempty column index. If
# the frame does have columns, the number of columns must equal the
# size of the columns.
num_columns = sum(column_widths)
num_columns = sum(self._column_widths_cache)
if num_columns > 0:
ErrorMessage.catch_bugs_and_request_email(
num_columns != len(self._columns_cache),
"Column widths: {} != {}".format(
num_columns, len(self._columns_cache)
),
f"Column widths: {num_columns} != {len(self._columns_cache)}",
)
ErrorMessage.catch_bugs_and_request_email(
any(val < 0 for val in column_widths),
"Column widths cannot be negative: {}".format(column_widths),
any(val < 0 for val in self._column_widths_cache),
f"Column widths cannot be negative: {self._column_widths_cache}",
)
self._column_widths_cache = column_widths
self._dtypes = dtypes
self._filter_empties()

@property
def _row_lengths(self):
Expand All @@ -232,10 +237,12 @@ def _row_lengths(self):
A list of row partitions lengths.
"""
if self._row_lengths_cache is None:
if len(self._partitions.T) > 0:
self._row_lengths_cache = [
obj.length() for obj in self._partitions.T[0]
]
if len(self._partitions) > 0:
index, self._row_lengths_cache = self._compute_axis_labels_and_lengths(
0
)
vnlitvinov marked this conversation as resolved.
Show resolved Hide resolved
if self._index_cache is None:
self._index_cache = index
else:
self._row_lengths_cache = []
return self._row_lengths_cache
Expand All @@ -252,7 +259,12 @@ def _column_widths(self):
"""
if self._column_widths_cache is None:
if len(self._partitions) > 0:
self._column_widths_cache = [obj.width() for obj in self._partitions[0]]
(
columns,
self._column_widths_cache,
) = self._compute_axis_labels_and_lengths(1)
if self._columns_cache is None:
self._columns_cache = columns
else:
self._column_widths_cache = []
return self._column_widths_cache
Expand Down Expand Up @@ -345,6 +357,10 @@ def _get_index(self):
pandas.Index
An index object containing the row labels.
"""
if self._index_cache is None:
self._index_cache, row_lengths = self._compute_axis_labels_and_lengths(0)
if self._row_lengths_cache is None:
vnlitvinov marked this conversation as resolved.
Show resolved Hide resolved
self._row_lengths_cache = row_lengths
return self._index_cache

def _get_columns(self):
Expand All @@ -356,6 +372,12 @@ def _get_columns(self):
pandas.Index
An index object containing the column labels.
"""
if self._columns_cache is None:
self._columns_cache, column_widths = self._compute_axis_labels_and_lengths(
1
)
if self._column_widths_cache is None:
vnlitvinov marked this conversation as resolved.
Show resolved Hide resolved
self._column_widths_cache = column_widths
return self._columns_cache

def _set_index(self, new_index):
Expand Down Expand Up @@ -407,7 +429,7 @@ def axes(self):
"""
return [self.index, self.columns]

def _compute_axis_labels(self, axis: int, partitions=None):
def _compute_axis_labels_and_lengths(self, axis: int, partitions=None):
"""
Compute the labels for specific `axis`.

Expand All @@ -423,14 +445,32 @@ def _compute_axis_labels(self, axis: int, partitions=None):
-------
pandas.Index
Labels for the specified `axis`.
List of int
Size of partitions alongsize specified `axis`.
vnlitvinov marked this conversation as resolved.
Show resolved Hide resolved
"""
if partitions is None:
partitions = self._partitions
new_index, _ = self._partition_mgr_cls.get_indices(axis, partitions)
return new_index
new_index, internal_idx = self._partition_mgr_cls.get_indices(axis, partitions)
return new_index, list(map(len, internal_idx))
YarShev marked this conversation as resolved.
Show resolved Hide resolved

def _filter_empties(self, compute_metadata=True):
"""
Remove empty partitions from `self._partitions` to avoid triggering excess computation.

Parameters
----------
compute_metadata : bool, default: True
Trigger the computations for partition sizes and labels if they're not done already.
"""
if not compute_metadata and (
self._index_cache is None
or self._columns_cache is None
or self._row_lengths_cache is None
or self._column_widths_cache is None
YarShev marked this conversation as resolved.
Show resolved Hide resolved
):
# do not trigger the computations
return

def _filter_empties(self):
"""Remove empty partitions from `self._partitions` to avoid triggering excess computation."""
if len(self.axes[0]) == 0 or len(self.axes[1]) == 0:
# This is the case for an empty frame. We don't want to completely remove
# all metadata and partitions so for the moment, we won't prune if the frame
Expand Down Expand Up @@ -1768,17 +1808,15 @@ def filter(self, axis: Union[Axis, int], condition: Callable) -> "PandasDatafram
new_partitions = self._partition_mgr_cls.map_axis_partitions(
axis.value, self._partitions, condition, keep_partitioning=True
)

new_axes, new_lengths = [0, 0], [0, 0]

new_axes[axis.value] = self.axes[axis.value]
new_axes[axis.value ^ 1] = self._compute_axis_labels(
axis.value ^ 1, new_partitions
)

new_lengths[axis.value] = self._axes_lengths[axis.value]
new_lengths[
axis.value ^ 1
] = None # We do not know what the resulting widths will be
(
new_axes[axis.value ^ 1],
new_lengths[axis.value ^ 1],
) = self._compute_axis_labels_and_lengths(axis.value ^ 1, new_partitions)

return self.__constructor__(
new_partitions,
Expand Down Expand Up @@ -1828,12 +1866,18 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> "PandasDataframe":
axis.value, self._partitions, func, keep_partitioning=True
)
if axis == Axis.COL_WISE:
new_index = self._compute_axis_labels(0, partitions)
new_columns = self.columns
new_index, row_lengths = self._compute_axis_labels_and_lengths(
0, partitions
)
new_columns, column_widths = self.columns, self._column_widths_cache
else:
new_index = self.index
new_columns = self._compute_axis_labels(1, partitions)
return self.__constructor__(partitions, new_index, new_columns)
new_index, row_lengths = self.index, self._row_lengths_cache
new_columns, column_widths = self._compute_axis_labels_and_lengths(
1, partitions
)
return self.__constructor__(
partitions, new_index, new_columns, row_lengths, column_widths
)

@lazy_metadata_decorator(apply_axis="both")
def apply_full_axis(
Expand Down Expand Up @@ -2154,6 +2198,20 @@ def _prepare_frame_to_broadcast(self, axis, indices, broadcast_all):
passed_len += len(internal)
return result_dict

def __make_constructor_args(self, partitions, index, columns) -> dict:
YarShev marked this conversation as resolved.
Show resolved Hide resolved
kw = {}
kw["index"], kw["row_lengths"] = (
self._compute_axis_labels_and_lengths(0, partitions)
if index is None
else (index, None)
)
kw["columns"], kw["column_widths"] = (
self._compute_axis_labels_and_lengths(1, partitions)
if columns is None
else (columns, None)
)
return kw

@lazy_metadata_decorator(apply_axis="both")
def broadcast_apply_select_indices(
self,
Expand Down Expand Up @@ -2233,14 +2291,8 @@ def broadcast_apply_select_indices(
keep_remaining,
)

new_axes = [
self._compute_axis_labels(i, new_partitions)
if new_axis is None
else new_axis
for i, new_axis in enumerate([new_index, new_columns])
]

return self.__constructor__(new_partitions, *new_axes)
kw = self.__make_constructor_args(new_partitions, new_index, new_columns)
return self.__constructor__(new_partitions, **kw)

@lazy_metadata_decorator(apply_axis="both")
def broadcast_apply_full_axis(
Expand Down Expand Up @@ -2307,25 +2359,14 @@ def broadcast_apply_full_axis(
keep_partitioning=True,
)
# Index objects for new object creation. This is shorter than if..else
new_axes = [
self._compute_axis_labels(i, new_partitions)
if new_axis is None
else new_axis
for i, new_axis in enumerate([new_index, new_columns])
]
kw = self.__make_constructor_args(new_partitions, new_index, new_columns)
if dtypes == "copy":
dtypes = self._dtypes
kw["dtypes"] = self._dtypes
elif dtypes is not None:
dtypes = pandas.Series(
[np.dtype(dtypes)] * len(new_axes[1]), index=new_axes[1]
kw["dtypes"] = pandas.Series(
[np.dtype(dtypes)] * len(kw["columns"]), index=kw["columns"]
)
result = self.__constructor__(
new_partitions,
*new_axes,
None,
None,
dtypes,
)
result = self.__constructor__(new_partitions, **kw)
if new_index is not None:
result.synchronize_labels(axis=0)
if new_columns is not None:
Expand Down Expand Up @@ -2710,14 +2751,8 @@ def groupby_reduce(
new_partitions = self._partition_mgr_cls.groupby_reduce(
axis, self._partitions, by_parts, map_func, reduce_func, apply_indices
)
new_axes = [
self._compute_axis_labels(i, new_partitions)
if new_axis is None
else new_axis
for i, new_axis in enumerate([new_index, new_columns])
]

return self.__constructor__(new_partitions, *new_axes)
kw = self.__make_constructor_args(new_partitions, new_index, new_columns)
return self.__constructor__(new_partitions, **kw)

@classmethod
def from_pandas(cls, df):
Expand Down
4 changes: 2 additions & 2 deletions modin/experimental/batch/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ def compute_batch(
partitions,
index,
columns,
row_lengths=tuple(map(len, internal_rows)),
column_widths=tuple(map(len, internal_cols)),
row_lengths=list(map(len, internal_rows)),
column_widths=list(map(len, internal_cols)),
)
query_compiler = PandasQueryCompiler(result_modin_frame)
result_df = pd.DataFrame(query_compiler=query_compiler)
Expand Down