Skip to content

Commit

Permalink
FIX-modin-project#1284: try to combine partitions
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Myskov <[email protected]>
  • Loading branch information
amyskov committed Sep 3, 2020
1 parent cfec73d commit 3db76ea
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 25 deletions.
13 changes: 5 additions & 8 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,14 +1276,11 @@ def searchsorted(self, **kwargs):
int or array of int
A scalar or array of insertion points with the same shape as value.
"""
new_modin_frame = self._modin_frame._apply_full_axis(
0,
lambda x: x.squeeze().searchsorted(**kwargs),
new_columns=self.columns,
new_index=pandas.RangeIndex(start=0, stop=len(kwargs["value"]), step=1),
clear_call_queue=True,
)
return self.__constructor__(new_modin_frame)
combined = self._modin_frame._frame_mgr_cls.combine_partitions(
self._modin_frame._partitions
).squeeze()

return combined.searchsorted(**kwargs)

# Dt map partitions operations

Expand Down
9 changes: 1 addition & 8 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1179,13 +1179,7 @@ def filter_full_axis(self, axis, func):
)

def _apply_full_axis(
self,
axis,
func,
new_index=None,
new_columns=None,
dtypes=None,
clear_call_queue=False,
self, axis, func, new_index=None, new_columns=None, dtypes=None,
):
"""
Perform a function across an entire axis.
Expand Down Expand Up @@ -1221,7 +1215,6 @@ def _apply_full_axis(
self._partitions,
self._build_mapreduce_func(axis, func),
keep_partitioning=True,
clear_call_queue=clear_call_queue,
)
# Index objects for new object creation. This is shorter than if..else
if new_columns is None:
Expand Down
13 changes: 9 additions & 4 deletions modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,10 @@ def map_axis_partitions(cls, axis, partitions, map_func, keep_partitioning=False
# Since we are already splitting the DataFrame back up after an
# operation, we will just use this time to compute the number of
# partitions as best we can right now.
if clear_call_queue:
for col_partitions in partitions:
for row_partitions in col_partitions:
row_partitions.call_queue = []
# if clear_call_queue:
# for col_partitions in partitions:
# for row_partitions in col_partitions:
# row_partitions.call_queue = []

if keep_partitioning:
num_splits = len(partitions) if axis == 0 else len(partitions.T)
Expand Down Expand Up @@ -339,6 +339,11 @@ def concatenate(cls, dfs):

return pandas.concat(dfs)

@classmethod
def combine_partitions(cls, partitions):
# Works for Series only
return pandas.concat([part[0].get() for part in partitions], copy=False)

@classmethod
def to_pandas(cls, partitions):
"""Convert this object into a Pandas DataFrame from the partitions.
Expand Down
11 changes: 6 additions & 5 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1419,11 +1419,12 @@ def searchsorted(self, value, side="left", sorter=None):
A scalar or array of insertion points with the same shape as value.
"""
value = np.array([value]) if is_scalar(value) else value
return (
self._query_compiler.searchsorted(value=value, side=side, sorter=sorter)
.to_numpy()
.squeeze()
)
# return (
# self._query_compiler.searchsorted(value=value, side=side, sorter=sorter)
# .to_numpy()
# .squeeze()
# )
return self._query_compiler.searchsorted(value=value, side=side, sorter=sorter)

def sort_values(
self,
Expand Down

0 comments on commit 3db76ea

Please sign in to comment.