diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c301c2d3f54..d351c7480eb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -189,6 +189,8 @@ jobs: run: python -m pytest modin/test/backends/pandas/test_internals.py - shell: bash -l {0} run: python -m pytest modin/test/test_envvar_npartitions.py + - shell: bash -l {0} + run: python -m pytest modin/test/test_partition_api.py test-defaults: needs: [lint-commit, lint-flake8, lint-black, test-api, test-headers] @@ -285,7 +287,7 @@ jobs: run: pytest modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py - shell: bash -l {0} run: bash <(curl -s https://codecov.io/bash) - + test-asv-benchmarks: needs: [lint-commit, lint-flake8, lint-black, test-api, test-headers] runs-on: ubuntu-latest diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 607c337230e..45a0ace8aed 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -31,6 +31,8 @@ jobs: run: python -m pytest modin/test/backends/pandas/test_internals.py - shell: bash -l {0} run: python -m pytest modin/test/test_envvar_npartitions.py + - shell: bash -l {0} + run: python -m pytest modin/test/test_partition_api.py test-defaults: runs-on: ubuntu-latest diff --git a/modin/api/partition_api.py b/modin/api/partition_api.py deleted file mode 100644 index 1e286231dc2..00000000000 --- a/modin/api/partition_api.py +++ /dev/null @@ -1,80 +0,0 @@ -# Licensed to Modin Development Team under one or more contributor license agreements. -# See the NOTICE file distributed with this work for additional information regarding -# copyright ownership. The Modin Development Team licenses this file to you under the -# Apache License, Version 2.0 (the "License"); you may not use this file except in -# compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under -# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF -# ANY KIND, either express or implied. See the License for the specific language -# governing permissions and limitations under the License. - - -def unwrap_partitions(api_layer_object, axis=None, bind_ip=False): - """ - Unwrap partitions of the `api_layer_object`. - - Parameters - ---------- - api_layer_object : DataFrame or Series - The API layer object. - axis : None, 0 or 1. Default is None - The axis to unwrap partitions for (0 - row partitions, 1 - column partitions). - If axis is None, all the partitions of the API layer object are unwrapped. - bind_ip : boolean. Default is False - Whether to bind node ip address to each partition or not. - - Returns - ------- - list - A list of Ray.ObjectRef/Dask.Future to partitions of the `api_layer_object` - if Ray/Dask is used as an engine. - - Notes - ----- - In case bind_ip=True, a list containing tuples of Ray.ObjectRef/Dask.Future to node ip addresses - and partitions of the `api_layer_object`, respectively, is returned if Ray/Dask is used as an engine. - """ - if not hasattr(api_layer_object, "_query_compiler"): - raise ValueError( - f"Only API Layer objects may be passed in here, got {type(api_layer_object)} instead." - ) - - if axis is None: - - def _unwrap_partitions(oid): - if bind_ip: - return [ - (partition.ip, getattr(partition, oid)) - for row in api_layer_object._query_compiler._modin_frame._partitions - for partition in row - ] - else: - return [ - getattr(partition, oid) - for row in api_layer_object._query_compiler._modin_frame._partitions - for partition in row - ] - - actual_engine = type( - api_layer_object._query_compiler._modin_frame._partitions[0][0] - ).__name__ - if actual_engine in ("PandasOnRayFramePartition",): - return _unwrap_partitions("oid") - elif actual_engine in ("PandasOnDaskFramePartition",): - return _unwrap_partitions("future") - raise ValueError( - f"Do not know how to unwrap '{actual_engine}' underlying partitions" - ) - else: - partitions = ( - api_layer_object._query_compiler._modin_frame._frame_mgr_cls.axis_partition( - api_layer_object._query_compiler._modin_frame._partitions, axis ^ 1 - ) - ) - return [ - part.coalesce(bind_ip=bind_ip).unwrap(squeeze=True, bind_ip=bind_ip) - for part in partitions - ] diff --git a/modin/api/__init__.py b/modin/distributed/dataframe/pandas/__init__.py similarity index 87% rename from modin/api/__init__.py rename to modin/distributed/dataframe/pandas/__init__.py index ae69fb73221..b207f531d81 100644 --- a/modin/api/__init__.py +++ b/modin/distributed/dataframe/pandas/__init__.py @@ -11,6 +11,6 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. -from .partition_api import unwrap_partitions +from .partitions import unwrap_partitions, from_partitions -__all__ = ["unwrap_partitions"] +__all__ = ["unwrap_partitions", "from_partitions"] diff --git a/modin/distributed/dataframe/pandas/partitions.py b/modin/distributed/dataframe/pandas/partitions.py new file mode 100644 index 00000000000..e71522ed22f --- /dev/null +++ b/modin/distributed/dataframe/pandas/partitions.py @@ -0,0 +1,159 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +import numpy as np + +from modin.pandas.dataframe import DataFrame +from modin.backends.pandas.query_compiler import PandasQueryCompiler + + +def unwrap_partitions(api_layer_object, axis=None, bind_ip=False): + """ + Unwrap partitions of the `api_layer_object`. + + Parameters + ---------- + api_layer_object : DataFrame or Series + The API layer object. + axis : None, 0 or 1. Default is None + The axis to unwrap partitions for (0 - row partitions, 1 - column partitions). + If axis is None, all the partitions of the API layer object are unwrapped. + bind_ip : boolean. Default is False + Whether to bind node ip address to each partition or not. + + Returns + ------- + list + A list of Ray.ObjectRef/Dask.Future to partitions of the `api_layer_object` + if Ray/Dask is used as an engine. + + Notes + ----- + In case bind_ip=True, a list containing tuples of Ray.ObjectRef/Dask.Future to node ip addresses + and partitions of the `api_layer_object`, respectively, is returned if Ray/Dask is used as an engine. + """ + if not hasattr(api_layer_object, "_query_compiler"): + raise ValueError( + f"Only API Layer objects may be passed in here, got {type(api_layer_object)} instead." + ) + + if axis is None: + + def _unwrap_partitions(oid): + if bind_ip: + return [ + [(partition.ip, getattr(partition, oid)) for partition in row] + for row in api_layer_object._query_compiler._modin_frame._partitions + ] + else: + return [ + [getattr(partition, oid) for partition in row] + for row in api_layer_object._query_compiler._modin_frame._partitions + ] + + actual_engine = type( + api_layer_object._query_compiler._modin_frame._partitions[0][0] + ).__name__ + if actual_engine in ("PandasOnRayFramePartition",): + return _unwrap_partitions("oid") + elif actual_engine in ("PandasOnDaskFramePartition",): + return _unwrap_partitions("future") + raise ValueError( + f"Do not know how to unwrap '{actual_engine}' underlying partitions" + ) + else: + partitions = ( + api_layer_object._query_compiler._modin_frame._frame_mgr_cls.axis_partition( + api_layer_object._query_compiler._modin_frame._partitions, axis ^ 1 + ) + ) + return [ + part.coalesce(bind_ip=bind_ip).unwrap(squeeze=True, bind_ip=bind_ip) + for part in partitions + ] + + +def from_partitions(partitions, axis): + """ + Create DataFrame from remote partitions. + + Parameters + ---------- + partitions : list + List of Ray.ObjectRef/Dask.Future referencing to partitions in depend of the engine used. + Or list containing tuples of Ray.ObjectRef/Dask.Future referencing to ip addresses of partitions + and partitions itself in depend of the engine used. + axis : None, 0 or 1 + The `axis` parameter is used to identify what are the partitions passed. + You have to set: + - `axis` to 0 if you want to create DataFrame from row partitions. + - `axis` to 1 if you want to create DataFrame from column partitions. + - `axis` to None if you want to create DataFrame from 2D list of partitions. + + Returns + ------- + DataFrame + DataFrame instance created from remote partitions. + """ + from modin.data_management.factories.dispatcher import EngineDispatcher + + factory = EngineDispatcher.get_engine() + + partition_class = factory.io_cls.frame_cls._frame_mgr_cls._partition_class + partition_frame_class = factory.io_cls.frame_cls + partition_mgr_class = factory.io_cls.frame_cls._frame_mgr_cls + + # Since we store partitions of Modin DataFrame as a 2D NumPy array we need to place + # passed partitions to 2D NumPy array to pass it to internal Modin Frame class. + # `axis=None` - convert 2D list to 2D NumPy array + if axis is None: + if isinstance(partitions[0][0], tuple): + parts = np.array( + [ + [partition_class(partition, ip=ip) for ip, partition in row] + for row in partitions + ] + ) + else: + parts = np.array( + [ + [partition_class(partition) for partition in row] + for row in partitions + ] + ) + # `axis=0` - place row partitions to 2D NumPy array so that each row of the array is one row partition. + elif axis == 0: + if isinstance(partitions[0], tuple): + parts = np.array( + [[partition_class(partition, ip=ip)] for ip, partition in partitions] + ) + else: + parts = np.array([[partition_class(partition)] for partition in partitions]) + # `axis=1` - place column partitions to 2D NumPy array so that each column of the array is one column partition. + elif axis == 1: + if isinstance(partitions[0], tuple): + parts = np.array( + [[partition_class(partition, ip=ip) for ip, partition in partitions]] + ) + else: + parts = np.array([[partition_class(partition) for partition in partitions]]) + else: + raise ValueError( + f"Got unacceptable value of axis {axis}. Possible values are {0}, {1} or {None}." + ) + + index = partition_mgr_class.get_indices(0, parts, lambda df: df.axes[0]) + columns = partition_mgr_class.get_indices(1, parts, lambda df: df.axes[1]) + return DataFrame( + query_compiler=PandasQueryCompiler(partition_frame_class(parts, index, columns)) + ) diff --git a/modin/experimental/xgboost/xgboost_ray.py b/modin/experimental/xgboost/xgboost_ray.py index 3e4d897c417..fed5d28d365 100644 --- a/modin/experimental/xgboost/xgboost_ray.py +++ b/modin/experimental/xgboost/xgboost_ray.py @@ -23,7 +23,7 @@ import numpy as np import pandas -from modin.api import unwrap_partitions +from modin.distributed.dataframe.pandas import unwrap_partitions from .utils import RabitContext, RabitContextManager LOGGER = logging.getLogger("[modin.xgboost]") diff --git a/modin/test/test_partition_api.py b/modin/test/test_partition_api.py new file mode 100644 index 00000000000..bfefb5b9b62 --- /dev/null +++ b/modin/test/test_partition_api.py @@ -0,0 +1,98 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +import numpy as np +import pandas +import pytest + +import modin.pandas as pd +from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions +from modin.config import Engine, NPartitions +from modin.pandas.test.utils import df_equals + + +if Engine.get() == "Ray": + import ray +if Engine.get() == "Dask": + from distributed.client import get_client + +NPartitions.put(4) + + +@pytest.mark.parametrize("axis", [None, 0, 1]) +def test_unwrap_partitions(axis): + data = np.random.randint(0, 100, size=(2 ** 16, 2 ** 8)) + df = pd.DataFrame(data) + + if axis is None: + expected_partitions = df._query_compiler._modin_frame._partitions + actual_partitions = np.array(unwrap_partitions(df, axis=axis)) + assert ( + expected_partitions.shape[0] == actual_partitions.shape[0] + and expected_partitions.shape[1] == expected_partitions.shape[1] + ) + for row_idx in range(expected_partitions.shape[0]): + for col_idx in range(expected_partitions.shape[1]): + if Engine.get() == "Ray": + assert ( + expected_partitions[row_idx][col_idx].oid + == actual_partitions[row_idx][col_idx] + ) + if Engine.get() == "Dask": + assert ( + expected_partitions[row_idx][col_idx].future + == actual_partitions[row_idx][col_idx] + ) + else: + expected_axis_partitions = ( + df._query_compiler._modin_frame._frame_mgr_cls.axis_partition( + df._query_compiler._modin_frame._partitions, axis ^ 1 + ) + ) + expected_axis_partitions = [ + axis_partition.coalesce().unwrap(squeeze=True) + for axis_partition in expected_axis_partitions + ] + actual_axis_partitions = unwrap_partitions(df, axis=axis) + assert len(expected_axis_partitions) == len(actual_axis_partitions) + for item_idx in range(len(expected_axis_partitions)): + if Engine.get() == "Ray": + df_equals( + ray.get(expected_axis_partitions[item_idx]), + ray.get(actual_axis_partitions[item_idx]), + ) + if Engine.get() == "Dask": + df_equals( + expected_axis_partitions[item_idx].result(), + actual_axis_partitions[item_idx].result(), + ) + + +@pytest.mark.parametrize("axis", [None, 0, 1]) +def test_from_partitions(axis): + data = np.random.randint(0, 100, size=(2 ** 16, 2 ** 8)) + df1, df2 = pandas.DataFrame(data), pandas.DataFrame(data) + expected_df = pandas.concat([df1, df2], axis=1 if axis is None else axis) + if Engine.get() == "Ray": + if axis is None: + futures = [[ray.put(df1), ray.put(df2)]] + else: + futures = [ray.put(df1), ray.put(df2)] + if Engine.get() == "Dask": + client = get_client() + if axis is None: + futures = [client.scatter([df1, df2], hash=False)] + else: + futures = client.scatter([df1, df2], hash=False) + actual_df = from_partitions(futures, axis) + df_equals(expected_df, actual_df)