Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFACTOR-#3780: Remove code duplication for PandasOnDaskDataframe #3781

Merged
merged 5 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Key Features and Updates
* REFACTOR-#4832: unify `split_result_of_axis_func_pandas` (#4831)
* REFACTOR-#4796: Introduce constant for __reduced__ column name (#4799)
* REFACTOR-#4000: Remove code duplication for `PandasOnRayDataframePartitionManager` (#4895)
* REFACTOR-#3780: Remove code duplication for `PandasOnDaskDataframe` (#3781)
* REFACTOR-#4530: Unify access to physical data for any partition type (#4829)
* Pandas API implementations and improvements
* FEAT-#4670: Implement convert_dtypes by mapping across partitions (#4671)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from ..partitioning.partition_manager import PandasOnDaskDataframePartitionManager
from modin.core.execution.dask.common.engine_wrapper import DaskWrapper


class PandasOnDaskDataframe(PandasDataframe):
Expand All @@ -41,86 +40,3 @@ class PandasOnDaskDataframe(PandasDataframe):
"""

_partition_mgr_cls = PandasOnDaskDataframePartitionManager

def _get_partition_size_along_axis(self, partition, axis=0):
"""
Compute the length along the specified axis of the specified partition.

Parameters
----------
partition : ``PandasOnDaskDataframeVirtualPartition`` or ``PandasOnDaskDataframePartition``
The partition whose size to compute.
axis : int, default: 0
The axis along which to compute size.

Returns
-------
list
A list of lengths along the specified axis that sum to the overall length of the partition
along the specified axis.

Notes
-----
This utility function is used to ensure that computation occurs asynchronously across all partitions
whether the partitions are virtual or physical partitions.
"""
if isinstance(partition, self._partition_mgr_cls._partition_class):
return [
partition.apply(
lambda df: len(df) if not axis else len(df.columns)
).list_of_blocks[0]
]
elif partition.axis == axis:
return [
ptn.apply(
lambda df: len(df) if not axis else len(df.columns)
).list_of_blocks[0]
for ptn in partition.list_of_block_partitions
]
return [
partition.list_of_block_partitions[0]
.apply(lambda df: len(df) if not axis else (len(df.columns)))
.list_of_blocks[0]
]

@property
def _row_lengths(self):
"""
Compute ther row partitions lengths if they are not cached.

Returns
-------
list
A list of row partitions lengths.
"""
if self._row_lengths_cache is None:
row_lengths_list = DaskWrapper.materialize(
[
self._get_partition_size_along_axis(obj, axis=0)
for obj in self._partitions.T[0]
]
)
self._row_lengths_cache = [sum(len_list) for len_list in row_lengths_list]
return self._row_lengths_cache

@property
def _column_widths(self):
"""
Compute the column partitions widths if they are not cached.

Returns
-------
list
A list of column partitions widths.
"""
if self._column_widths_cache is None:
col_widths_list = DaskWrapper.materialize(
[
self._get_partition_size_along_axis(obj, axis=1)
for obj in self._partitions[0]
]
)
self._column_widths_cache = [
sum(width_list) for width_list in col_widths_list
]
return self._column_widths_cache