From 81f3c5e65e61c648288344e452dd7709353184be Mon Sep 17 00:00:00 2001 From: mvashishtha Date: Tue, 2 Aug 2022 17:37:04 -0500 Subject: [PATCH] REFACTOR-#4696: Merge dask and ray virtual partition code. Signed-off-by: mvashishtha --- .../pandas/partitioning/axis_partition.py | 294 ++++++++++++++- .../partitioning/virtual_partition.py | 338 +----------------- .../partitioning/virtual_partition.py | 85 ----- .../partitioning/virtual_partition.py | 314 +--------------- 4 files changed, 301 insertions(+), 730 deletions(-) delete mode 100644 modin/core/execution/python/implementations/pandas_on_python/partitioning/virtual_partition.py diff --git a/modin/core/dataframe/pandas/partitioning/axis_partition.py b/modin/core/dataframe/pandas/partitioning/axis_partition.py index 75e864cdf51..db28552b8d9 100644 --- a/modin/core/dataframe/pandas/partitioning/axis_partition.py +++ b/modin/core/dataframe/pandas/partitioning/axis_partition.py @@ -22,15 +22,143 @@ class PandasDataframeAxisPartition(BaseDataframeAxisPartition): + + block_partition_type = None + wait = None + """ An abstract class is created to simplify and consolidate the code for axis partition that run pandas. Because much of the code is similar, this allows us to reuse this code. + + + Parameters + ---------- + list_of_blocks : Union[list, block_partition_type] + List of ``block_partition_type`` and + ``PandasDataframeAxisPartition`` objects, or a single + ``block_partition_type``. + get_ip : bool, default: False + Whether to get node IP addresses of conforming partitions or not. + full_axis : bool, default: True + Whether or not the virtual partition encompasses the whole axis. + call_queue : list, optional + A list of tuples (callable, args, kwargs) that contains deferred calls. """ + def __init__(self, list_of_blocks, get_ip=False, full_axis=True, call_queue=None): + if isinstance(list_of_blocks, self.block_partition_type): + list_of_blocks = [list_of_blocks] + self.call_queue = call_queue or [] + self.full_axis = full_axis + # In the simple case, none of the partitions that will compose this + # partition are themselves virtual partition. The partitions that will + # be combined are just the partitions as given to the constructor. + if not any( + isinstance(obj, PandasDataframeAxisPartition) for obj in list_of_blocks + ): + self.list_of_partitions_to_combine = list_of_blocks + return + # Check that all axis are the same in `list_of_blocks` + # We should never have mismatching axis in the current implementation. We add this + # defensive assertion to ensure that undefined behavior does not happen. + assert ( + len( + set( + obj.axis + for obj in list_of_blocks + if isinstance(obj, PandasDataframeAxisPartition) + ) + ) + == 1 + ) + # When the axis of all virtual partitions matches this axis, + # extend and combine the lists of physical partitions. + if ( + next( + obj + for obj in list_of_blocks + if isinstance(obj, PandasDataframeAxisPartition) + ).axis + == self.axis + ): + new_list_of_blocks = [] + for obj in list_of_blocks: + new_list_of_blocks.extend( + obj.list_of_partitions_to_combine + ) if isinstance( + obj, PandasDataframeAxisPartition + ) else new_list_of_blocks.append( + obj + ) + self.list_of_partitions_to_combine = new_list_of_blocks + # Materialize partitions if the axis of this virtual does not match the virtual partitions + else: + self.list_of_partitions_to_combine = [ + obj.force_materialization().list_of_partitions_to_combine[0] + if isinstance(obj, PandasDataframeAxisPartition) + else obj + for obj in list_of_blocks + ] + + @property + def list_of_blocks(self): + """ + Get the list of physical partition objects that compose this partition. + + Returns + ------- + List + A list of ``distributed.Future``. + """ + # Defer draining call queue until we get the partitions + # TODO Look into draining call queue at the same time as the task + result = [None] * len(self.list_of_partitions_to_combine) + for idx, partition in enumerate(self.list_of_partitions_to_combine): + partition.drain_call_queue() + result[idx] = partition._data + return result + + @property + def list_of_ips(self): + """ + Get the IPs holding the physical objects composing this partition. + + Returns + ------- + List + A list of IPs as ``distributed.Future`` or str. + """ + # Defer draining call queue until we get the ip address + result = [None] * len(self.list_of_partitions_to_combine) + for idx, partition in enumerate(self.list_of_partitions_to_combine): + partition.drain_call_queue() + result[idx] = partition._ip_cache + return result + + def _wrap_partitions(self, partitions): + """ + Wrap partitions passed as a list of distributed.Future with ``block_partition_type`` class. + + Parameters + ---------- + partitions : list + List of distributed.Future. + + Returns + ------- + list + List of ``block_partition_type`` objects. + """ + return [ + self.block_partition_type(future, length, width, ip) + for (future, length, width, ip) in zip(*[iter(partitions)] * 4) + ] + def apply( self, func, + *args, num_splits=None, other_axis_partition=None, maintain_partitioning=True, @@ -43,6 +171,8 @@ def apply( ---------- func : callable The function to apply. + *args : iterable + Additional positional arguments to be passed in `func`. num_splits : int, default: None The number of times to split the result object. other_axis_partition : PandasDataframeAxisPartition, default: None @@ -63,8 +193,15 @@ def apply( list A list of `PandasDataframePartition` objects. """ - if num_splits is None: + if not self.full_axis: + # If this is not a full axis partition, it already contains a subset of + # the full axis, so we shouldn't split the result further. + num_splits = 1 + elif num_splits is None: num_splits = len(self.list_of_blocks) + if len(self.call_queue) > 0: + self.drain_call_queue() + kwargs["args"] = args if other_axis_partition is not None: if not isinstance(other_axis_partition, list): @@ -76,7 +213,7 @@ def apply( [0] + [len(o.list_of_blocks) for o in other_axis_partition] ) - return self._wrap_partitions( + result_partitions = self._wrap_partitions( self.deploy_func_between_two_axis_partitions( self.axis, func, @@ -96,7 +233,158 @@ def apply( ) args = [self.axis, func, num_splits, maintain_partitioning] args.extend(self.list_of_blocks) - return self._wrap_partitions(self.deploy_axis_func(*args, **kwargs)) + result_partitions = self._wrap_partitions( + self.deploy_axis_func(*args, **kwargs) + ) + if self.full_axis: + return result_partitions + else: + # If this is a full axis partition, just take out the single split in the result. + return result_partitions[0] + + def force_materialization(self, get_ip=False): + """ + Materialize partitions into a single partition. + + Parameters + ---------- + get_ip : bool, default: False + Whether to get node ip address to a single partition or not. + + Returns + ------- + PandasDataframeAxisPartition + An axis partition containing only a single materialized partition. + """ + materialized = super().force_materialization(get_ip=get_ip) + self.list_of_partitions_to_combine = materialized.list_of_partitions_to_combine + return materialized + + def mask(self, row_indices, col_indices): + """ + Create (synchronously) a mask that extracts the indices provided. + + Parameters + ---------- + row_indices : list-like, slice or label + The row labels for the rows to extract. + col_indices : list-like, slice or label + The column labels for the columns to extract. + + Returns + ------- + PandasDataframeAxisPartition + A new ``PandasDataframeAxisPartition`` object, + materialized. + """ + return ( + self.force_materialization() + .list_of_partitions_to_combine[0] + .mask(row_indices, col_indices) + ) + + def to_pandas(self): + """ + Convert the data in this partition to a ``pandas.DataFrame``. + + Returns + ------- + pandas DataFrame. + """ + return self.force_materialization().list_of_partitions_to_combine[0].to_pandas() + + _length_cache = None + + def length(self): + """ + Get the length of this partition. + + Returns + ------- + int + The length of the partition. + """ + if self._length_cache is None: + if self.axis == 0: + self._length_cache = sum( + obj.length() for obj in self.list_of_partitions_to_combine + ) + else: + self._length_cache = self.list_of_partitions_to_combine[0].length() + return self._length_cache + + _width_cache = None + + def width(self): + """ + Get the width of this partition. + + Returns + ------- + int + The width of the partition. + """ + if self._width_cache is None: + if self.axis == 1: + self._width_cache = sum( + obj.width() for obj in self.list_of_partitions_to_combine + ) + else: + self._width_cache = self.list_of_partitions_to_combine[0].width() + return self._width_cache + + def drain_call_queue(self, num_splits=None): + """ + Execute all operations stored in this partition's call queue. + + Parameters + ---------- + num_splits : int, default: None + The number of times to split the result object. + """ + + # Copy the original call queue and set it to empty so we don't try to + # drain it again when we apply(). + call_queue = self.call_queue + self.call_queue = [] + + def drain(df): + for func, args, kwargs in call_queue: + df = func(df, *args, **kwargs) + return df + + drained = self.apply(drain, num_splits=num_splits) + if not self.full_axis: + drained = [drained] + self.list_of_partitions_to_combine = drained + + def add_to_apply_calls(self, func, *args, **kwargs): + """ + Add a function to the call queue. + + Parameters + ---------- + func : callable + Function to be added to the call queue. + *args : iterable + Additional positional arguments to be passed in `func`. + **kwargs : dict + Additional keyword arguments to be passed in `func`. + + Returns + ------- + PandasDataframeAxisPartition + A new ``PandasDataframeAxisPartition`` object. + + Notes + ----- + The keyword arguments are sent as a dictionary. + """ + return type(self)( + self.list_of_partitions_to_combine, + full_axis=self.full_axis, + call_queue=self.call_queue + [(func, args, kwargs)], + ) @classmethod def deploy_axis_func( diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py index aa20066bedf..70ae0ff9ab5 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py @@ -15,7 +15,7 @@ from distributed import Future from distributed.utils import get_ip -from dask.distributed import wait +from dask.distributed import wait as dask_wait import pandas @@ -27,118 +27,16 @@ class PandasOnDaskDataframeVirtualPartition(PandasDataframeAxisPartition): - """ - The class implements the interface in ``PandasDataframeAxisPartition``. - - Parameters - ---------- - list_of_blocks : Union[list, PandasOnDaskDataframePartition] - List of ``PandasOnDaskDataframePartition`` and - ``PandasOnDaskDataframeVirtualPartition`` objects, or a single - ``PandasOnDaskDataframePartition``. - get_ip : bool, default: False - Whether to get node IP addresses of conforming partitions or not. - full_axis : bool, default: True - Whether or not the virtual partition encompasses the whole axis. - call_queue : list, optional - A list of tuples (callable, args, kwargs) that contains deferred calls. - """ - - axis = None - - def __init__(self, list_of_blocks, get_ip=False, full_axis=True, call_queue=None): - if isinstance(list_of_blocks, PandasOnDaskDataframePartition): - list_of_blocks = [list_of_blocks] - self.call_queue = call_queue or [] - self.full_axis = full_axis - # In the simple case, none of the partitions that will compose this - # partition are themselves virtual partition. The partitions that will - # be combined are just the partitions as given to the constructor. - if not any( - isinstance(obj, PandasOnDaskDataframeVirtualPartition) - for obj in list_of_blocks - ): - self.list_of_partitions_to_combine = list_of_blocks - return - # Check that all axis are the same in `list_of_blocks` - # We should never have mismatching axis in the current implementation. We add this - # defensive assertion to ensure that undefined behavior does not happen. - assert ( - len( - set( - obj.axis - for obj in list_of_blocks - if isinstance(obj, PandasOnDaskDataframeVirtualPartition) - ) - ) - == 1 - ) - # When the axis of all virtual partitions matches this axis, - # extend and combine the lists of physical partitions. - if ( - next( - obj - for obj in list_of_blocks - if isinstance(obj, PandasOnDaskDataframeVirtualPartition) - ).axis - == self.axis - ): - new_list_of_blocks = [] - for obj in list_of_blocks: - new_list_of_blocks.extend( - obj.list_of_partitions_to_combine - ) if isinstance( - obj, PandasOnDaskDataframeVirtualPartition - ) else new_list_of_blocks.append( - obj - ) - self.list_of_partitions_to_combine = new_list_of_blocks - # Materialize partitions if the axis of this virtual does not match the virtual partitions - else: - self.list_of_partitions_to_combine = [ - obj.force_materialization().list_of_partitions_to_combine[0] - if isinstance(obj, PandasOnDaskDataframeVirtualPartition) - else obj - for obj in list_of_blocks - ] - partition_type = PandasOnDaskDataframePartition + block_partition_type = PandasOnDaskDataframePartition instance_type = Future + wait = dask_wait + axis = None - @property - def list_of_blocks(self): - """ - Get the list of physical partition objects that compose this partition. - - Returns - ------- - List - A list of ``distributed.Future``. - """ - # Defer draining call queue until we get the partitions - # TODO Look into draining call queue at the same time as the task - result = [None] * len(self.list_of_partitions_to_combine) - for idx, partition in enumerate(self.list_of_partitions_to_combine): - partition.drain_call_queue() - result[idx] = partition._data - return result - - @property - def list_of_ips(self): - """ - Get the IPs holding the physical objects composing this partition. - - Returns - ------- - List - A list of IPs as ``distributed.Future`` or str. - """ - # Defer draining call queue until we get the ip address - result = [None] * len(self.list_of_partitions_to_combine) - for idx, partition in enumerate(self.list_of_partitions_to_combine): - partition.drain_call_queue() - result[idx] = partition._ip_cache - return result + def wait(self): + """Wait completing computations on the object wrapped by the partition.""" + self.drain_call_queue() + wait(self.list_of_blocks) @classmethod def deploy_axis_func( @@ -233,226 +131,6 @@ def deploy_func_between_two_axis_partitions( **kwargs, ) - def _wrap_partitions(self, partitions): - """ - Wrap partitions passed as a list of distributed.Future with ``PandasOnDaskDataframePartition`` class. - - Parameters - ---------- - partitions : list - List of distributed.Future. - - Returns - ------- - list - List of ``PandasOnDaskDataframePartition`` objects. - """ - return [ - self.partition_type(future, length, width, ip) - for (future, length, width, ip) in zip(*[iter(partitions)] * 4) - ] - - def apply( - self, - func, - *args, - num_splits=None, - other_axis_partition=None, - maintain_partitioning=True, - **kwargs, - ): - """ - Apply a function to this axis partition along full axis. - - Parameters - ---------- - func : callable - The function to apply. - *args : iterable - Additional positional arguments to be passed in `func`. - num_splits : int, default: None - The number of times to split the result object. - other_axis_partition : PandasDataframeAxisPartition, default: None - Another `PandasDataframeAxisPartition` object to be applied - to func. This is for operations that are between two data sets. - maintain_partitioning : bool, default: True - Whether to keep the partitioning in the same - orientation as it was previously or not. This is important because we may be - operating on an individual AxisPartition and not touching the rest. - In this case, we have to return the partitioning to its previous - orientation (the lengths will remain the same). This is ignored between - two axis partitions. - **kwargs : dict - Additional keywords arguments to be passed in `func`. - - Returns - ------- - list - A list of `PandasOnDaskDataframeVirtualPartition` objects. - """ - if not self.full_axis: - # If this is not a full axis partition, it already contains a subset of - # the full axis, so we shouldn't split the result further. - num_splits = 1 - if len(self.call_queue) > 0: - self.drain_call_queue() - kwargs["args"] = args - result = super(PandasOnDaskDataframeVirtualPartition, self).apply( - func, num_splits, other_axis_partition, maintain_partitioning, **kwargs - ) - if self.full_axis: - return result - else: - # If this is a full axis partition, just take out the single split in the result. - return result[0] - - def force_materialization(self, get_ip=False): - """ - Materialize partitions into a single partition. - - Parameters - ---------- - get_ip : bool, default: False - Whether to get node ip address to a single partition or not. - - Returns - ------- - PandasOnDaskDataframeVirtualPartition - An axis partition containing only a single materialized partition. - """ - materialized = super( - PandasOnDaskDataframeVirtualPartition, self - ).force_materialization(get_ip=get_ip) - self.list_of_partitions_to_combine = materialized.list_of_partitions_to_combine - return materialized - - def mask(self, row_indices, col_indices): - """ - Create (synchronously) a mask that extracts the indices provided. - - Parameters - ---------- - row_indices : list-like, slice or label - The row labels for the rows to extract. - col_indices : list-like, slice or label - The column labels for the columns to extract. - - Returns - ------- - PandasOnDaskDataframeVirtualPartition - A new ``PandasOnDaskDataframeVirtualPartition`` object, - materialized. - """ - return ( - self.force_materialization() - .list_of_partitions_to_combine[0] - .mask(row_indices, col_indices) - ) - - def to_pandas(self): - """ - Convert the data in this partition to a ``pandas.DataFrame``. - - Returns - ------- - pandas DataFrame. - """ - return self.force_materialization().list_of_partitions_to_combine[0].to_pandas() - - _length_cache = None - - def length(self): - """ - Get the length of this partition. - - Returns - ------- - int - The length of the partition. - """ - if self._length_cache is None: - if self.axis == 0: - self._length_cache = sum( - obj.length() for obj in self.list_of_partitions_to_combine - ) - else: - self._length_cache = self.list_of_partitions_to_combine[0].length() - return self._length_cache - - _width_cache = None - - def width(self): - """ - Get the width of this partition. - - Returns - ------- - int - The width of the partition. - """ - if self._width_cache is None: - if self.axis == 1: - self._width_cache = sum( - obj.width() for obj in self.list_of_partitions_to_combine - ) - else: - self._width_cache = self.list_of_partitions_to_combine[0].width() - return self._width_cache - - def drain_call_queue(self, num_splits=None): - """ - Execute all operations stored in this partition's call queue. - - Parameters - ---------- - num_splits : int, default: None - The number of times to split the result object. - """ - - def drain(df): - for func, args, kwargs in self.call_queue: - df = func(df, *args, **kwargs) - return df - - drained = super(PandasOnDaskDataframeVirtualPartition, self).apply( - drain, num_splits=num_splits - ) - self.list_of_partitions_to_combine = drained - self.call_queue = [] - - def wait(self): - """Wait completing computations on the object wrapped by the partition.""" - self.drain_call_queue() - wait(self.list_of_blocks) - - def add_to_apply_calls(self, func, *args, **kwargs): - """ - Add a function to the call queue. - - Parameters - ---------- - func : callable - Function to be added to the call queue. - *args : iterable - Additional positional arguments to be passed in `func`. - **kwargs : dict - Additional keyword arguments to be passed in `func`. - - Returns - ------- - PandasOnDaskDataframeVirtualPartition - A new ``PandasOnDaskDataframeVirtualPartition`` object. - - Notes - ----- - The keyword arguments are sent as a dictionary. - """ - return type(self)( - self.list_of_partitions_to_combine, - full_axis=self.full_axis, - call_queue=self.call_queue + [(func, args, kwargs)], - ) - class PandasOnDaskDataframeColumnPartition(PandasOnDaskDataframeVirtualPartition): """ diff --git a/modin/core/execution/python/implementations/pandas_on_python/partitioning/virtual_partition.py b/modin/core/execution/python/implementations/pandas_on_python/partitioning/virtual_partition.py deleted file mode 100644 index ee7631054f6..00000000000 --- a/modin/core/execution/python/implementations/pandas_on_python/partitioning/virtual_partition.py +++ /dev/null @@ -1,85 +0,0 @@ -# Licensed to Modin Development Team under one or more contributor license agreements. -# See the NOTICE file distributed with this work for additional information regarding -# copyright ownership. The Modin Development Team licenses this file to you under the -# Apache License, Version 2.0 (the "License"); you may not use this file except in -# compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under -# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF -# ANY KIND, either express or implied. See the License for the specific language -# governing permissions and limitations under the License. - -"""The module defines interface for a virtual partition with pandas storage format and python engine.""" - -import pandas - -from modin.core.dataframe.pandas.partitioning.axis_partition import ( - PandasDataframeAxisPartition, -) -from .partition import PandasOnPythonDataframePartition - - -class PandasOnPythonDataframeAxisPartition(PandasDataframeAxisPartition): - """ - Class defines axis partition interface with pandas storage format and Python engine. - - Inherits functionality from ``PandasDataframeAxisPartition`` class. - - Parameters - ---------- - list_of_blocks : list - List with partition objects to create common axis partition from. - full_axis : bool, default: True - Whether or not the virtual partition encompasses the whole axis. - """ - - def __init__(self, list_of_blocks, full_axis: bool = True): - if not full_axis: - raise NotImplementedError( - "Pandas on Python execution requires full-axis partitions." - ) - for obj in list_of_blocks: - obj.drain_call_queue() - # Unwrap from PandasDataframePartition object for ease of use - self.list_of_blocks = [obj._data for obj in list_of_blocks] - - partition_type = PandasOnPythonDataframePartition - instance_type = pandas.DataFrame - - -class PandasOnPythonDataframeColumnPartition(PandasOnPythonDataframeAxisPartition): - """ - The column partition implementation for pandas storage format and Python engine. - - All of the implementation for this class is in the ``PandasOnPythonDataframeAxisPartition`` - parent class, and this class defines the axis to perform the computation over. - - Parameters - ---------- - list_of_blocks : list - List with partition objects to create common axis partition from. - full_axis : bool, default: True - Whether or not the virtual partition encompasses the whole axis. - """ - - axis = 0 - - -class PandasOnPythonDataframeRowPartition(PandasOnPythonDataframeAxisPartition): - """ - The row partition implementation for pandas storage format and Python engine. - - All of the implementation for this class is in the ``PandasOnPythonDataframeAxisPartition`` - parent class, and this class defines the axis to perform the computation over. - - Parameters - ---------- - list_of_blocks : list - List with partition objects to create common axis partition from. - full_axis : bool, default: True - Whether or not the virtual partition encompasses the whole axis. - """ - - axis = 1 diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 421559133ce..6b161282afc 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -42,101 +42,11 @@ class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition): A list of tuples (callable, args, kwargs) that contains deferred calls. """ - partition_type = PandasOnRayDataframePartition + block_partition_type = PandasOnRayDataframePartition instance_type = ray.ObjectRef + wait = ray.wait axis = None - def __init__(self, list_of_blocks, get_ip=False, full_axis=True, call_queue=None): - if isinstance(list_of_blocks, PandasOnRayDataframePartition): - list_of_blocks = [list_of_blocks] - self.full_axis = full_axis - self.call_queue = call_queue or [] - # In the simple case, none of the partitions that will compose this - # partition are themselves virtual partition. The partitions that will - # be combined are just the partitions as given to the constructor. - if not any( - isinstance(obj, PandasOnRayDataframeVirtualPartition) - for obj in list_of_blocks - ): - self.list_of_partitions_to_combine = list_of_blocks - return - # Check that all axis are the same in `list_of_blocks` - # We should never have mismatching axis in the current implementation. We add this - # defensive assertion to ensure that undefined behavior does not happen. - assert ( - len( - set( - obj.axis - for obj in list_of_blocks - if isinstance(obj, PandasOnRayDataframeVirtualPartition) - ) - ) - == 1 - ) - # When the axis of all virtual partitions matches this axis, - # extend and combine the lists of physical partitions. - if ( - next( - obj - for obj in list_of_blocks - if isinstance(obj, PandasOnRayDataframeVirtualPartition) - ).axis - == self.axis - ): - new_list_of_blocks = [] - for obj in list_of_blocks: - new_list_of_blocks.extend( - obj.list_of_partitions_to_combine - ) if isinstance( - obj, PandasOnRayDataframeVirtualPartition - ) else new_list_of_blocks.append( - obj - ) - self.list_of_partitions_to_combine = new_list_of_blocks - # Materialize partitions if the axis of this virtual does not match the virtual partitions - else: - self.list_of_partitions_to_combine = [ - obj.force_materialization().list_of_partitions_to_combine[0] - if isinstance(obj, PandasOnRayDataframeVirtualPartition) - else obj - for obj in list_of_blocks - ] - - @property - def list_of_blocks(self): - """ - Get the list of physical partition objects that compose this partition. - - Returns - ------- - List - A list of ``ray.ObjectRef``. - """ - # Defer draining call queue until we get the partitions - # TODO Look into draining call queue at the same time as the task - result = [None] * len(self.list_of_partitions_to_combine) - for idx, partition in enumerate(self.list_of_partitions_to_combine): - partition.drain_call_queue() - result[idx] = partition._data - return result - - @property - def list_of_ips(self): - """ - Get the IPs holding the physical objects composing this partition. - - Returns - ------- - List - A list of IPs as ``ray.ObjectRef`` or str. - """ - # Defer draining call queue until we get the ip address - result = [None] * len(self.list_of_partitions_to_combine) - for idx, partition in enumerate(self.list_of_partitions_to_combine): - partition.drain_call_queue() - result[idx] = partition._ip_cache - return result - @classmethod def deploy_axis_func( cls, @@ -234,232 +144,12 @@ def deploy_func_between_two_axis_partitions( **kwargs, ) - def _wrap_partitions(self, partitions): - """ - Wrap partitions passed as a list of ``ray.ObjectRef`` with ``PandasOnRayDataframePartition`` class. - - Parameters - ---------- - partitions : list - List of ``ray.ObjectRef``. - - Returns - ------- - list - List of ``PandasOnRayDataframePartition`` objects. - """ - return [ - self.partition_type(object_id, length, width, ip) - for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4) - ] - - def apply( - self, - func, - *args, - num_splits=None, - other_axis_partition=None, - maintain_partitioning=True, - **kwargs, - ): - """ - Apply a function to this axis partition along full axis. - - Parameters - ---------- - func : callable - The function to apply. - *args : iterable - Additional positional arguments to be passed in `func`. - num_splits : int, default: None - The number of times to split the result object. - other_axis_partition : PandasDataframeAxisPartition, default: None - Another `PandasDataframeAxisPartition` object to be applied - to func. This is for operations that are between two data sets. - maintain_partitioning : bool, default: True - Whether to keep the partitioning in the same - orientation as it was previously or not. This is important because we may be - operating on an individual AxisPartition and not touching the rest. - In this case, we have to return the partitioning to its previous - orientation (the lengths will remain the same). This is ignored between - two axis partitions. - **kwargs : dict - Additional keywords arguments to be passed in `func`. - - Returns - ------- - list - A list of `PandasOnRayDataframeVirtualPartition` objects. - """ - if not self.full_axis: - # If this is not a full axis partition, it already contains a subset of - # the full axis, so we shouldn't split the result further. - num_splits = 1 - if len(self.call_queue) > 0: - self.drain_call_queue() - kwargs["args"] = args - result = super(PandasOnRayDataframeVirtualPartition, self).apply( - func, - num_splits, - other_axis_partition, - maintain_partitioning, - **kwargs, - ) - if self.full_axis: - return result - else: - # If this is a full axis partition, just take out the single split in the result. - return result[0] - - def force_materialization(self, get_ip=False): - """ - Materialize partitions into a single partition. - - Parameters - ---------- - get_ip : bool, default: False - Whether to get node ip address to a single partition or not. - - Returns - ------- - PandasOnRayDataframeVirtualPartition - An axis partition containing only a single materialized partition. - """ - materialized = super( - PandasOnRayDataframeVirtualPartition, self - ).force_materialization(get_ip=get_ip) - self.list_of_partitions_to_combine = materialized.list_of_partitions_to_combine - return materialized - - def mask(self, row_indices, col_indices): - """ - Create (synchronously) a mask that extracts the indices provided. - - Parameters - ---------- - row_indices : list-like, slice or label - The row labels for the rows to extract. - col_indices : list-like, slice or label - The column labels for the columns to extract. - - Returns - ------- - PandasOnRayDataframeVirtualPartition - A new ``PandasOnRayDataframeVirtualPartition`` object, - materialized. - """ - return ( - self.force_materialization() - .list_of_partitions_to_combine[0] - .mask(row_indices, col_indices) - ) - - def to_pandas(self): - """ - Convert the data in this partition to a ``pandas.DataFrame``. - - Returns - ------- - pandas DataFrame. - """ - return self.force_materialization().list_of_partitions_to_combine[0].to_pandas() - - _length_cache = None - - def length(self): - """ - Get the length of this partition. - - Returns - ------- - int - The length of the partition. - """ - if self._length_cache is None: - if self.axis == 0: - self._length_cache = sum( - obj.length() for obj in self.list_of_partitions_to_combine - ) - else: - self._length_cache = self.list_of_partitions_to_combine[0].length() - return self._length_cache - - _width_cache = None - - def width(self): - """ - Get the width of this partition. - - Returns - ------- - int - The width of the partition. - """ - if self._width_cache is None: - if self.axis == 1: - self._width_cache = sum( - obj.width() for obj in self.list_of_partitions_to_combine - ) - else: - self._width_cache = self.list_of_partitions_to_combine[0].width() - return self._width_cache - - def drain_call_queue(self, num_splits=None): - """ - Execute all operations stored in this partition's call queue. - - Parameters - ---------- - num_splits : int, default: None - The number of times to split the result object. - """ - - def drain(df): - for func, args, kwargs in self.call_queue: - df = func(df, *args, **kwargs) - return df - - drained = super(PandasOnRayDataframeVirtualPartition, self).apply( - drain, num_splits=num_splits - ) - self.list_of_partitions_to_combine = drained - self.call_queue = [] - def wait(self): """Wait completing computations on the object wrapped by the partition.""" self.drain_call_queue() futures = self.list_of_blocks ray.wait(futures, num_returns=len(futures)) - def add_to_apply_calls(self, func, *args, **kwargs): - """ - Add a function to the call queue. - - Parameters - ---------- - func : callable or ray.ObjectRef - Function to be added to the call queue. - *args : iterable - Additional positional arguments to be passed in `func`. - **kwargs : dict - Additional keyword arguments to be passed in `func`. - - Returns - ------- - PandasOnRayDataframeVirtualPartition - A new ``PandasOnRayDataframeVirtualPartition`` object. - - Notes - ----- - It does not matter if `func` is callable or an ``ray.ObjectRef``. Ray will - handle it correctly either way. The keyword arguments are sent as a dictionary. - """ - return type(self)( - self.list_of_partitions_to_combine, - full_axis=self.full_axis, - call_queue=self.call_queue + [(func, args, kwargs)], - ) - class PandasOnRayDataframeColumnPartition(PandasOnRayDataframeVirtualPartition): """