Skip to content

Commit

Permalink
FEAT-modin-project#7337: Using dynamic partitionning in broadcast_apply
Browse files Browse the repository at this point in the history
Signed-off-by: Kirill Suvorov <[email protected]>
  • Loading branch information
Retribution98 committed Jul 8, 2024
1 parent 4e7afa7 commit ec46e3c
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
1 change: 0 additions & 1 deletion modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3264,7 +3264,6 @@ def broadcast_apply(
partition_sizes_along_axis, joined_index = self._get_axis_lengths_cache(
axis
), self.copy_axis_cache(axis)

new_frame = self._partition_mgr_cls.broadcast_apply(
axis, func, left_parts, right_parts
)
Expand Down
93 changes: 92 additions & 1 deletion modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def get_partitions(index):

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_apply(cls, axis, apply_func, left, right):
def base_broadcast_apply(cls, axis, apply_func, left, right):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function.
Expand Down Expand Up @@ -492,6 +492,40 @@ def map_func(df, *others):
]
)

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_axis(
cls,
axis,
apply_func,
left,
right,
keep_partitioning=False,
):
num_splits = len(left) if axis == 0 else len(left.T)
preprocessed_map_func = cls.preprocess_func(apply_func)
left_partitions = cls.axis_partition(left, axis)
right_partitions = None if right is None else cls.axis_partition(right, axis)
kw = {
"num_splits": num_splits,
"maintain_partitioning": keep_partitioning,
}

result_blocks = np.array(
[
left_partitions[i].apply(
preprocessed_map_func,
other_axis_partition=right_partitions[i],
**kw,
)
for i in np.arange(len(left_partitions))
]
)
# If we are mapping over columns, they are returned to use the same as
# rows, so we need to transpose the returned 2D NumPy array to return
# the structure to the correct order.
return result_blocks.T if not axis else result_blocks

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_axis_partitions(
Expand Down Expand Up @@ -647,6 +681,63 @@ def base_map_partitions(
]
)

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_apply(
cls,
axis,
apply_func,
left,
right,
):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function
using different approaches to achieve the best performance.
Parameters
----------
axis : {0, 1}
Axis to apply and broadcast over.
apply_func : callable
Function to apply.
left : np.ndarray
NumPy array of left partitions.
right : np.ndarray
NumPy array of right partitions.
Returns
-------
np.ndarray
NumPy array of result partition objects.
"""
# The condition for the execution of `base_broadcast_apply` is different from
# the same condition in the `map_partitions`, since the columnar partitioning approach
# cannot be implemented for the `broadcast_apply`. This is due to the fact that different
# partitions of the left and right dataframes are possible for the `broadcast_apply`,
# as a result of which it is necessary to merge partitions on both axes at once,
# which leads to large slowdowns.
if (
np.prod(left.shape) <= 1.5 * CpuCount.get()
or left.shape[axis] < CpuCount.get() // 5
):
# block-wise broadcast
new_partitions = cls.base_broadcast_apply(
axis,
apply_func,
left,
right,
)
else:
# axis-wise broadcast
new_partitions = cls.broadcast_axis(
axis=axis ^ 1,
left=left,
right=right,
apply_func=apply_func,
keep_partitioning=True,
)
return new_partitions

@classmethod
@wait_computations_if_benchmark_mode
def map_partitions(
Expand Down

0 comments on commit ec46e3c

Please sign in to comment.