From d98324d737b9ce030ca37e895d0f7a0ea436d918 Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Sat, 16 Mar 2024 17:53:56 +0100 Subject: [PATCH] Force block partitions materialization --- .../pandas_on_ray/partitioning/virtual_partition.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 917ba989e65..53ab02729cd 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -88,7 +88,7 @@ def __init__( if not isinstance(data, Collection) or len(data) == 1: if not isinstance(data, Collection): data = [data] - self._set_data_ref(data[0]._data_ref) + self._set_data_ref(data[0]._data) self._num_splits = 1 self._list_of_block_partitions = data return @@ -119,6 +119,7 @@ def __init__( # TODO: We have a subset of the same frame here and can just get a single chunk # from the original frame instead of concatenating all these chunks. + data = [part._data for part in self._list_of_block_partitions] self._set_data_ref(self._concat(refs)) def _set_data_ref( @@ -165,16 +166,16 @@ def apply( if other_axis_partition is not None: if isinstance(other_axis_partition, Collection): if len(other_axis_partition) == 1: - other_part = other_axis_partition[0]._data_ref + other_part = other_axis_partition[0]._data else: concat_fn = ( PandasOnRayDataframeColumnPartition if self.axis else PandasOnRayDataframeRowPartition )._concat - other_part = concat_fn([p._data_ref for p in other_axis_partition]) + other_part = concat_fn([p._data for p in other_axis_partition]) else: - other_part = other_axis_partition._data_ref + other_part = other_axis_partition._data args = [other_part] + list(args) de = self._apply(func, args, kwargs)