Skip to content

Commit

Permalink
FIX-#7170: Don't use MinPartitionSize configuration variable in rem…
Browse files Browse the repository at this point in the history
…ote context (#7177)

Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev authored Apr 15, 2024
1 parent 456068a commit 7b233e4
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 45 deletions.
15 changes: 11 additions & 4 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import Engine, IsRayCluster, NPartitions
from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -1549,7 +1549,9 @@ def _reorder_labels(self, row_positions=None, col_positions=None):
# the "standard" partitioning. Knowing the standard partitioning scheme
# we are able to compute new row lengths.
new_lengths = get_length_list(
axis_len=len(row_idx), num_splits=ordered_rows.shape[0]
axis_len=len(row_idx),
num_splits=ordered_rows.shape[0],
min_block_size=MinPartitionSize.get(),
)
else:
# If the frame's partitioning was preserved then
Expand Down Expand Up @@ -1585,7 +1587,9 @@ def _reorder_labels(self, row_positions=None, col_positions=None):
# the "standard" partitioning. Knowing the standard partitioning scheme
# we are able to compute new column widths.
new_widths = get_length_list(
axis_len=len(col_idx), num_splits=ordered_cols.shape[1]
axis_len=len(col_idx),
num_splits=ordered_cols.shape[1],
min_block_size=MinPartitionSize.get(),
)
else:
# If the frame's partitioning was preserved then
Expand Down Expand Up @@ -3500,7 +3504,9 @@ def broadcast_apply_full_axis(
if kw["row_lengths"] is None and is_index_materialized:
if axis == 0:
kw["row_lengths"] = get_length_list(
axis_len=len(new_index), num_splits=new_partitions.shape[0]
axis_len=len(new_index),
num_splits=new_partitions.shape[0],
min_block_size=MinPartitionSize.get(),
)
elif axis == 1:
if self._row_lengths_cache is not None and len(new_index) == sum(
Expand All @@ -3512,6 +3518,7 @@ def broadcast_apply_full_axis(
kw["column_widths"] = get_length_list(
axis_len=len(new_columns),
num_splits=new_partitions.shape[1],
min_block_size=MinPartitionSize.get(),
)
elif axis == 0:
if self._column_widths_cache is not None and len(
Expand Down
23 changes: 20 additions & 3 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import numpy as np
import pandas

from modin.config import MinPartitionSize
from modin.core.dataframe.base.partitioning.axis_partition import (
BaseDataframeAxisPartition,
)
Expand Down Expand Up @@ -276,6 +277,7 @@ def apply(
for part in axis_partition.list_of_blocks
]
),
min_block_size=MinPartitionSize.get(),
)
)
result = self._wrap_partitions(
Expand All @@ -287,8 +289,9 @@ def apply(
num_splits,
maintain_partitioning,
*self.list_of_blocks,
manual_partition=manual_partition,
min_block_size=MinPartitionSize.get(),
lengths=lengths,
manual_partition=manual_partition,
)
)
if self.full_axis:
Expand Down Expand Up @@ -391,6 +394,7 @@ def deploy_axis_func(
num_splits,
maintain_partitioning,
*partitions,
min_block_size,
lengths=None,
manual_partition=False,
return_generator=False,
Expand All @@ -415,6 +419,8 @@ def deploy_axis_func(
If False, create a new partition layout.
*partitions : iterable
All partitions that make up the full axis (row or column).
min_block_size : int
Minimum number of rows/columns in a single split.
lengths : list, optional
The list of lengths to shuffle the object.
manual_partition : bool, default: False
Expand Down Expand Up @@ -473,10 +479,16 @@ def deploy_axis_func(
lengths = None
if return_generator:
return generate_result_of_axis_func_pandas(
axis, num_splits, result, lengths
axis,
num_splits,
result,
min_block_size,
lengths,
)
else:
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)
return split_result_of_axis_func_pandas(
axis, num_splits, result, min_block_size, lengths
)

@classmethod
def deploy_func_between_two_axis_partitions(
Expand All @@ -489,6 +501,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size,
return_generator=False,
):
"""
Expand All @@ -513,6 +526,8 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
min_block_size : int
Minimum number of rows/columns in a single split.
return_generator : bool, default: False
Return a generator from the function, set to `True` for Ray backend
as Ray remote functions can return Generators.
Expand Down Expand Up @@ -559,12 +574,14 @@ def deploy_func_between_two_axis_partitions(
axis,
num_splits,
result,
min_block_size,
)
else:
return split_result_of_axis_func_pandas(
axis,
num_splits,
result,
min_block_size,
)

@classmethod
Expand Down
6 changes: 4 additions & 2 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from modin.config import (
BenchmarkMode,
Engine,
MinPartitionSize,
NPartitions,
PersistentPickle,
ProgressBar,
Expand Down Expand Up @@ -890,8 +891,9 @@ def from_pandas(cls, df, return_dims=False):
A NumPy array with partitions (with dimensions or not).
"""
num_splits = NPartitions.get()
row_chunksize = compute_chunksize(df.shape[0], num_splits)
col_chunksize = compute_chunksize(df.shape[1], num_splits)
min_block_size = MinPartitionSize.get()
row_chunksize = compute_chunksize(df.shape[0], num_splits, min_block_size)
col_chunksize = compute_chunksize(df.shape[1], num_splits, min_block_size)

bar_format = (
"{l_bar}{bar}{r_bar}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def deploy_axis_func(
num_splits,
maintain_partitioning,
*partitions,
min_block_size,
lengths=None,
manual_partition=False,
):
Expand All @@ -135,6 +136,8 @@ def deploy_axis_func(
If False, create a new partition layout.
*partitions : iterable
All partitions that make up the full axis (row or column).
min_block_size : int
Minimum number of rows/columns in a single split.
lengths : iterable, default: None
The list of lengths to shuffle the partition into.
manual_partition : bool, default: False
Expand All @@ -159,6 +162,7 @@ def deploy_axis_func(
*partitions,
),
f_kwargs={
"min_block_size": min_block_size,
"lengths": lengths,
"manual_partition": manual_partition,
},
Expand All @@ -177,6 +181,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size,
):
"""
Deploy a function along a full axis between two data sets.
Expand All @@ -200,6 +205,8 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
min_block_size : int
Minimum number of rows/columns in a single split.
Returns
-------
Expand All @@ -219,6 +226,9 @@ def deploy_func_between_two_axis_partitions(
other_shape,
*partitions,
),
f_kwargs={
"min_block_size": min_block_size,
},
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN),
pure=False,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import numpy as np
import ray

from modin.config import GpuCount
from modin.config import GpuCount, MinPartitionSize
from modin.core.execution.ray.common import RayWrapper
from modin.core.execution.ray.generic.partitioning import (
GenericRayDataframePartitionManager,
Expand Down Expand Up @@ -125,7 +125,9 @@ def from_pandas(cls, df, return_dims=False):
num_splits = GpuCount.get()
put_func = cls._partition_class.put
# For now, we default to row partitioning
pandas_dfs = split_result_of_axis_func_pandas(0, num_splits, df)
pandas_dfs = split_result_of_axis_func_pandas(
0, num_splits, df, min_block_size=MinPartitionSize.get()
)
keys = [
put_func(cls._get_gpu_managers()[i], pandas_dfs[i])
for i in range(num_splits)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def deploy_axis_func(
num_splits,
maintain_partitioning,
*partitions,
min_block_size,
lengths=None,
manual_partition=False,
max_retries=None,
Expand All @@ -161,6 +162,8 @@ def deploy_axis_func(
If False, create a new partition layout.
*partitions : iterable
All partitions that make up the full axis (row or column).
min_block_size : int
Minimum number of rows/columns in a single split.
lengths : list, optional
The list of lengths to shuffle the object.
manual_partition : bool, default: False
Expand Down Expand Up @@ -188,6 +191,7 @@ def deploy_axis_func(
f_len_args=len(f_args),
f_kwargs=f_kwargs,
manual_partition=manual_partition,
min_block_size=min_block_size,
lengths=lengths,
return_generator=True,
)
Expand All @@ -203,6 +207,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size,
):
"""
Deploy a function along a full axis between two data sets.
Expand All @@ -226,6 +231,8 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
min_block_size : int
Minimum number of rows/columns in a single split.
Returns
-------
Expand All @@ -245,6 +252,7 @@ def deploy_func_between_two_axis_partitions(
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
min_block_size=min_block_size,
return_generator=True,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def deploy_axis_func(
num_splits,
maintain_partitioning,
*partitions,
min_block_size,
lengths=None,
manual_partition=False,
max_retries=None,
Expand All @@ -162,6 +163,8 @@ def deploy_axis_func(
If False, create a new partition layout.
*partitions : iterable
All partitions that make up the full axis (row or column).
min_block_size : int
Minimum number of rows/columns in a single split.
lengths : list, optional
The list of lengths to shuffle the object.
manual_partition : bool, default: False
Expand All @@ -188,6 +191,7 @@ def deploy_axis_func(
maintain_partitioning,
*partitions,
manual_partition=manual_partition,
min_block_size=min_block_size,
lengths=lengths,
)

Expand All @@ -202,6 +206,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size,
):
"""
Deploy a function along a full axis between two data sets.
Expand All @@ -225,6 +230,8 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
min_block_size : int
Minimum number of rows/columns in a single split.
Returns
-------
Expand All @@ -243,6 +250,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size=min_block_size,
)

def wait(self):
Expand Down
5 changes: 3 additions & 2 deletions modin/core/io/column_stores/column_store_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def build_index(cls, partition_ids):
row_lengths : list
List with lengths of index chunks.
"""
num_partitions = NPartitions.get()
index_len = (
0 if len(partition_ids) == 0 else cls.materialize(partition_ids[-2][0])
)
Expand All @@ -130,7 +129,9 @@ def build_index(cls, partition_ids):
else:
index = index_len
index_len = len(index)
index_chunksize = compute_chunksize(index_len, num_partitions)
num_partitions = NPartitions.get()
min_block_size = MinPartitionSize.get()
index_chunksize = compute_chunksize(index_len, num_partitions, min_block_size)
if index_chunksize > index_len:
row_lengths = [index_len] + [0 for _ in range(num_partitions - 1)]
else:
Expand Down
8 changes: 6 additions & 2 deletions modin/core/io/column_stores/parquet_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,9 @@ def _normalize_partitioning(cls, remote_parts, row_lengths, column_widths):
for i in range(num_splits):
new_parts[offset + i].append(split[i])

new_row_lengths.extend(get_length_list(part_len, num_splits))
new_row_lengths.extend(
get_length_list(part_len, num_splits, MinPartitionSize.get())
)

remote_parts = np.array(new_parts)
row_lengths = new_row_lengths
Expand All @@ -746,7 +748,9 @@ def _normalize_partitioning(cls, remote_parts, row_lengths, column_widths):
for row_parts in remote_parts
]
)
column_widths = get_length_list(sum(column_widths), desired_col_nparts)
column_widths = get_length_list(
sum(column_widths), desired_col_nparts, MinPartitionSize.get()
)

return remote_parts, row_lengths, column_widths

Expand Down
5 changes: 3 additions & 2 deletions modin/core/io/text/text_file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pandas.core.dtypes.common import is_list_like
from pandas.io.common import stringify_path

from modin.config import NPartitions
from modin.config import MinPartitionSize, NPartitions
from modin.core.io.file_dispatcher import FileDispatcher, OpenFile
from modin.core.io.text.utils import CustomNewlineIterator
from modin.core.storage_formats.pandas.utils import compute_chunksize
Expand Down Expand Up @@ -571,7 +571,8 @@ def _define_metadata(
"""
# This is the number of splits for the columns
num_splits = min(len(column_names) or 1, NPartitions.get())
column_chunksize = compute_chunksize(df.shape[1], num_splits)
min_block_size = MinPartitionSize.get()
column_chunksize = compute_chunksize(df.shape[1], num_splits, min_block_size)
if column_chunksize > len(column_names):
column_widths = [len(column_names)]
# This prevents us from unnecessarily serializing a bunch of empty
Expand Down
Loading

0 comments on commit 7b233e4

Please sign in to comment.