From 3db76eae4da6b696aa66d908ecd6fec6499ac9f6 Mon Sep 17 00:00:00 2001 From: Alexander Myskov Date: Tue, 4 Aug 2020 04:51:13 -0500 Subject: [PATCH] FIX-#1284: try to combine partitions Signed-off-by: Alexander Myskov --- modin/backends/pandas/query_compiler.py | 13 +++++-------- modin/engines/base/frame/data.py | 9 +-------- modin/engines/base/frame/partition_manager.py | 13 +++++++++---- modin/pandas/series.py | 11 ++++++----- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index fdff70653d9..1a97d36cd77 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -1276,14 +1276,11 @@ def searchsorted(self, **kwargs): int or array of int A scalar or array of insertion points with the same shape as value. """ - new_modin_frame = self._modin_frame._apply_full_axis( - 0, - lambda x: x.squeeze().searchsorted(**kwargs), - new_columns=self.columns, - new_index=pandas.RangeIndex(start=0, stop=len(kwargs["value"]), step=1), - clear_call_queue=True, - ) - return self.__constructor__(new_modin_frame) + combined = self._modin_frame._frame_mgr_cls.combine_partitions( + self._modin_frame._partitions + ).squeeze() + + return combined.searchsorted(**kwargs) # Dt map partitions operations diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index 1587e251faf..8452433e25c 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -1179,13 +1179,7 @@ def filter_full_axis(self, axis, func): ) def _apply_full_axis( - self, - axis, - func, - new_index=None, - new_columns=None, - dtypes=None, - clear_call_queue=False, + self, axis, func, new_index=None, new_columns=None, dtypes=None, ): """ Perform a function across an entire axis. @@ -1221,7 +1215,6 @@ def _apply_full_axis( self._partitions, self._build_mapreduce_func(axis, func), keep_partitioning=True, - clear_call_queue=clear_call_queue, ) # Index objects for new object creation. This is shorter than if..else if new_columns is None: diff --git a/modin/engines/base/frame/partition_manager.py b/modin/engines/base/frame/partition_manager.py index 5b5dc1a3ba5..3dc1172190d 100644 --- a/modin/engines/base/frame/partition_manager.py +++ b/modin/engines/base/frame/partition_manager.py @@ -260,10 +260,10 @@ def map_axis_partitions(cls, axis, partitions, map_func, keep_partitioning=False # Since we are already splitting the DataFrame back up after an # operation, we will just use this time to compute the number of # partitions as best we can right now. - if clear_call_queue: - for col_partitions in partitions: - for row_partitions in col_partitions: - row_partitions.call_queue = [] + # if clear_call_queue: + # for col_partitions in partitions: + # for row_partitions in col_partitions: + # row_partitions.call_queue = [] if keep_partitioning: num_splits = len(partitions) if axis == 0 else len(partitions.T) @@ -339,6 +339,11 @@ def concatenate(cls, dfs): return pandas.concat(dfs) + @classmethod + def combine_partitions(cls, partitions): + # Works for Series only + return pandas.concat([part[0].get() for part in partitions], copy=False) + @classmethod def to_pandas(cls, partitions): """Convert this object into a Pandas DataFrame from the partitions. diff --git a/modin/pandas/series.py b/modin/pandas/series.py index 79d7e555d9f..defd0385294 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -1419,11 +1419,12 @@ def searchsorted(self, value, side="left", sorter=None): A scalar or array of insertion points with the same shape as value. """ value = np.array([value]) if is_scalar(value) else value - return ( - self._query_compiler.searchsorted(value=value, side=side, sorter=sorter) - .to_numpy() - .squeeze() - ) + # return ( + # self._query_compiler.searchsorted(value=value, side=side, sorter=sorter) + # .to_numpy() + # .squeeze() + # ) + return self._query_compiler.searchsorted(value=value, side=side, sorter=sorter) def sort_values( self,