From 1955c72b2ebd720e80c2426a88a46c2fd88df64b Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Wed, 24 Apr 2024 16:15:16 +0000 Subject: [PATCH 1/6] FEAT-#6492: Add `from_map` feature to create dataframe Signed-off-by: Igoshev, Iaroslav --- .../implementations/pandas_on_dask/io/io.py | 46 ++++++++++++++++++ .../dispatching/factories/dispatcher.py | 5 ++ .../dispatching/factories/factories.py | 19 ++++++++ .../implementations/pandas_on_ray/io/io.py | 48 +++++++++++++++++++ .../pandas_on_unidist/io/io.py | 47 ++++++++++++++++++ modin/core/io/io.py | 28 +++++++++++ modin/pandas/io.py | 30 ++++++++++++ modin/tests/pandas/test_io.py | 19 +++++++- 8 files changed, 241 insertions(+), 1 deletion(-) diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py index b68ad983715..b7b0d888b78 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -13,6 +13,8 @@ """Module houses class that implements ``BaseIO`` using Dask as an execution engine.""" +import numpy as np +import pandas from distributed.client import default_client from modin.core.execution.dask.common import DaskWrapper @@ -188,3 +190,47 @@ def df_to_series(df): partitions = [client.submit(df_to_series, part) for part in partitions] return from_delayed(partitions) + + @classmethod + def from_map(cls, func, iterable, *args, **kwargs): + """ + Create a Modin `query_compiler` from a map function. + + This method will construct a Modin `query_compiler` split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + BaseQueryCompiler + QueryCompiler containing data returned by map function. + """ + func = cls.frame_cls._partition_mgr_cls.preprocess_func(func) + partitions = np.array( + [ + [ + cls.frame_partition_cls( + deploy_map_func.remote(func, obj, *args, **kwargs) + ) + ] + for obj in iterable + ] + ) + return cls.query_compiler_cls(cls.frame_cls(partitions)) + + +def deploy_map_func(func, obj, *args, **kwargs): + result = func(obj, *args, **kwargs) + if not isinstance(result, pandas.DataFrame): + result = pandas.DataFrame(result) + return result diff --git a/modin/core/execution/dispatching/factories/dispatcher.py b/modin/core/execution/dispatching/factories/dispatcher.py index 99c6153264c..0bbe84af9aa 100644 --- a/modin/core/execution/dispatching/factories/dispatcher.py +++ b/modin/core/execution/dispatching/factories/dispatcher.py @@ -191,6 +191,11 @@ def from_ray(cls, ray_obj): def from_dask(cls, dask_obj): return cls.get_factory()._from_dask(dask_obj) + @classmethod + @_inherit_docstrings(factories.BaseFactory._from_map) + def from_map(cls, func, iterable, *args, **kwargs): + return cls.get_factory()._from_map(func, iterable, *args, **kwargs) + @classmethod @_inherit_docstrings(factories.BaseFactory._read_parquet) def read_parquet(cls, **kwargs): diff --git a/modin/core/execution/dispatching/factories/factories.py b/modin/core/execution/dispatching/factories/factories.py index d521a40f2da..98c11b7be8e 100644 --- a/modin/core/execution/dispatching/factories/factories.py +++ b/modin/core/execution/dispatching/factories/factories.py @@ -221,6 +221,25 @@ def _from_ray(cls, ray_obj): def _from_dask(cls, dask_obj): return cls.io_cls.from_dask(dask_obj) + @classmethod + @doc( + _doc_io_method_template, + source="a map function", + params=""" + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + """, + method="from_map", + ) + def _from_map(cls, func, iterable, *args, **kwargs): + return cls.io_cls.from_map(func, iterable, *args, **kwargs) + @classmethod @doc( _doc_io_method_template, diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index ec88dda60d9..34cb574b0fa 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -15,7 +15,9 @@ import io +import numpy as np import pandas +import ray from pandas.io.common import get_handle, stringify_path from ray.data import from_pandas_refs @@ -68,6 +70,7 @@ class PandasOnRayIO(RayIO): """Factory providing methods for performing I/O operations using pandas as storage format on Ray as engine.""" frame_cls = PandasOnRayDataframe + frame_partition_cls = PandasOnRayDataframePartition query_compiler_cls = PandasQueryCompiler build_args = dict( frame_partition_cls=PandasOnRayDataframePartition, @@ -302,3 +305,48 @@ def to_ray(cls, modin_obj): """ parts = unwrap_partitions(modin_obj, axis=0) return from_pandas_refs(parts) + + @classmethod + def from_map(cls, func, iterable, *args, **kwargs): + """ + Create a Modin `query_compiler` from a map function. + + This method will construct a Modin `query_compiler` split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + BaseQueryCompiler + QueryCompiler containing data returned by map function. + """ + func = cls.frame_cls._partition_mgr_cls.preprocess_func(func) + partitions = np.array( + [ + [ + cls.frame_partition_cls( + deploy_map_func.remote(func, obj, *args, **kwargs) + ) + ] + for obj in iterable + ] + ) + return cls.query_compiler_cls(cls.frame_cls(partitions)) + + +@ray.remote +def deploy_map_func(func, obj, *args, **kwargs): + result = func(obj, *args, **kwargs) + if not isinstance(result, pandas.DataFrame): + result = pandas.DataFrame(result) + return result diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py index c5bc772ad7f..98783a2f1e2 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py @@ -15,7 +15,9 @@ import io +import numpy as np import pandas +import unidist from pandas.io.common import get_handle, stringify_path from modin.core.execution.unidist.common import SignalActor, UnidistWrapper @@ -258,3 +260,48 @@ def func(df, **kw): # pragma: no cover UnidistWrapper.materialize( [part.list_of_blocks[0] for row in result for part in row] ) + + @classmethod + def from_map(cls, func, iterable, *args, **kwargs): + """ + Create a Modin `query_compiler` from a map function. + + This method will construct a Modin `query_compiler` split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + BaseQueryCompiler + QueryCompiler containing data returned by map function. + """ + func = cls.frame_cls._partition_mgr_cls.preprocess_func(func) + partitions = np.array( + [ + [ + cls.frame_partition_cls( + deploy_map_func.remote(func, obj, *args, **kwargs) + ) + ] + for obj in iterable + ] + ) + return cls.query_compiler_cls(cls.frame_cls(partitions)) + + +@unidist.remote +def deploy_map_func(func, obj, *args, **kwargs): + result = func(obj, *args, **kwargs) + if not isinstance(result, pandas.DataFrame): + result = pandas.DataFrame(result) + return result diff --git a/modin/core/io/io.py b/modin/core/io/io.py index 56f6f353de0..cb7647e2207 100644 --- a/modin/core/io/io.py +++ b/modin/core/io/io.py @@ -164,6 +164,34 @@ def from_dask(cls, dask_obj): "Modin DataFrame can only be converted to a Dask DataFrame if Modin uses a Dask engine." ) + @classmethod + def from_map(cls, func, iterable, *args, **kwargs): + """ + Create a Modin `query_compiler` from a map function. + + This method will construct a Modin `query_compiler` split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + BaseQueryCompiler + QueryCompiler containing data returned by map function. + """ + raise RuntimeError( + "Modin DataFrame can only be created if Modin uses Ray, Dask or MPI engine." + ) + @classmethod @_inherit_docstrings(pandas.read_parquet, apilink="pandas.read_parquet") @doc( diff --git a/modin/pandas/io.py b/modin/pandas/io.py index e29629ebd46..bbfdcce68f3 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -1109,6 +1109,36 @@ def from_dask(dask_obj) -> DataFrame: return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_dask(dask_obj)) +def from_map(func, iterable, *args, **kwargs) -> DataFrame: + """ + Create a Modin DataFrame from map function applied to an iterable object. + + This method will construct a Modin DataFrame split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + DataFrame + A new Modin DataFrame object. + """ + from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher + + return ModinObjects.DataFrame( + query_compiler=FactoryDispatcher.from_map(func, iterable, *args, *kwargs) + ) + + def to_pandas(modin_obj: SupportsPublicToPandas) -> DataFrame | Series: """ Convert a Modin DataFrame/Series to a pandas DataFrame/Series. diff --git a/modin/tests/pandas/test_io.py b/modin/tests/pandas/test_io.py index 08e92d12d17..24232a69271 100644 --- a/modin/tests/pandas/test_io.py +++ b/modin/tests/pandas/test_io.py @@ -47,7 +47,7 @@ TestReadFromSqlServer, ) from modin.db_conn import ModinDatabaseConnection, UnsupportedDatabaseException -from modin.pandas.io import from_arrow, from_dask, from_ray, to_pandas +from modin.pandas.io import from_arrow, from_dask, from_map, from_ray, to_pandas from modin.tests.test_utils import warns_that_defaulting_to_pandas from .utils import ( @@ -3461,3 +3461,20 @@ def test_from_dask(): result_df = from_dask(dask_df) df_equals(result_df, modin_df) + + +@pytest.mark.skipif( + condition=Engine.get() not in ("Ray", "Dask", "Unidist"), + reason="Dask DataFrame can only be created if Modin uses Ray, Dask or MPI engine.", +) +@pytest.mark.filterwarnings(default_to_pandas_ignore_string) +def test_from_map(): + factor = 3 + data = [1] * factor + [2] * factor + [3] * factor + expected_df = pd.DataFrame(data, index=[0, 1, 2] * factor) + + def map_func(x, factor): + return [x] * factor + + result_df = from_map(map_func, [1, 2, 3], 3) + df_equals(result_df, expected_df) From 139d3b50686227207c0455f70154ce87e5b9618a Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Wed, 24 Apr 2024 18:50:33 +0000 Subject: [PATCH 2/6] Add docstrings Signed-off-by: Igoshev, Iaroslav --- .../implementations/pandas_on_dask/io/io.py | 20 +++++++++++++++- .../dispatching/factories/factories.py | 23 ++++++++++++------- .../implementations/pandas_on_ray/io/io.py | 20 +++++++++++++++- .../pandas_on_unidist/io/io.py | 20 +++++++++++++++- 4 files changed, 72 insertions(+), 11 deletions(-) diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py index b7b0d888b78..2b6922ab092 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -229,7 +229,25 @@ def from_map(cls, func, iterable, *args, **kwargs): return cls.query_compiler_cls(cls.frame_cls(partitions)) -def deploy_map_func(func, obj, *args, **kwargs): +def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover + """ + Deploy a func to apply to an object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + obj : object + An object to apply a function to. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + pandas.DataFrame + """ result = func(obj, *args, **kwargs) if not isinstance(result, pandas.DataFrame): result = pandas.DataFrame(result) diff --git a/modin/core/execution/dispatching/factories/factories.py b/modin/core/execution/dispatching/factories/factories.py index 98c11b7be8e..aa94688e3df 100644 --- a/modin/core/execution/dispatching/factories/factories.py +++ b/modin/core/execution/dispatching/factories/factories.py @@ -222,10 +222,15 @@ def _from_dask(cls, dask_obj): return cls.io_cls.from_dask(dask_obj) @classmethod - @doc( - _doc_io_method_template, - source="a map function", - params=""" + def _from_map(cls, func, iterable, *args, **kwargs): + """ + Create a Modin `query_compiler` from a map function. + + This method will construct a Modin `query_compiler` split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- func : callable Function to map across the iterable object. iterable : Iterable @@ -234,10 +239,12 @@ def _from_dask(cls, dask_obj): Positional arguments to pass in `func`. **kwargs : dict Keyword arguments to pass in `func`. - """, - method="from_map", - ) - def _from_map(cls, func, iterable, *args, **kwargs): + + Returns + ------- + BaseQueryCompiler + QueryCompiler containing data returned by map function. + """ return cls.io_cls.from_map(func, iterable, *args, **kwargs) @classmethod diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 34cb574b0fa..24b9e0823fa 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -345,7 +345,25 @@ def from_map(cls, func, iterable, *args, **kwargs): @ray.remote -def deploy_map_func(func, obj, *args, **kwargs): +def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover + """ + Deploy a func to apply to an object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + obj : object + An object to apply a function to. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + pandas.DataFrame + """ result = func(obj, *args, **kwargs) if not isinstance(result, pandas.DataFrame): result = pandas.DataFrame(result) diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py index 98783a2f1e2..9d870c8ed78 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py @@ -300,7 +300,25 @@ def from_map(cls, func, iterable, *args, **kwargs): @unidist.remote -def deploy_map_func(func, obj, *args, **kwargs): +def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover + """ + Deploy a func to apply to an object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + obj : object + An object to apply a function to. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + pandas.DataFrame + """ result = func(obj, *args, **kwargs) if not isinstance(result, pandas.DataFrame): result = pandas.DataFrame(result) From 1f4fde447a2325d327e5e5b54edb89e2c1b1e585 Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Wed, 24 Apr 2024 19:25:44 +0000 Subject: [PATCH 3/6] Add additional attrs Signed-off-by: Igoshev, Iaroslav --- .../core/execution/dask/implementations/pandas_on_dask/io/io.py | 1 + .../execution/unidist/implementations/pandas_on_unidist/io/io.py | 1 + 2 files changed, 2 insertions(+) diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py index 2b6922ab092..fb7b1b22483 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -70,6 +70,7 @@ class PandasOnDaskIO(BaseIO): """The class implements interface in ``BaseIO`` using Dask as an execution engine.""" frame_cls = PandasOnDaskDataframe + frame_partition_cls = PandasOnDaskDataframePartition query_compiler_cls = PandasQueryCompiler build_args = dict( frame_cls=PandasOnDaskDataframe, diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py index 9d870c8ed78..90a440efa00 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py @@ -64,6 +64,7 @@ class PandasOnUnidistIO(UnidistIO): """Factory providing methods for performing I/O operations using pandas as storage format on unidist as engine.""" frame_cls = PandasOnUnidistDataframe + frame_partition_cls = PandasOnUnidistDataframePartition query_compiler_cls = PandasQueryCompiler build_args = dict( frame_partition_cls=PandasOnUnidistDataframePartition, From 28b080508b9fcad8aff5170932367f07db60970b Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Wed, 24 Apr 2024 20:29:41 +0000 Subject: [PATCH 4/6] Fix Dask Signed-off-by: Igoshev, Iaroslav --- .../execution/dask/implementations/pandas_on_dask/io/io.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py index fb7b1b22483..b05db53598b 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -217,11 +217,12 @@ def from_map(cls, func, iterable, *args, **kwargs): QueryCompiler containing data returned by map function. """ func = cls.frame_cls._partition_mgr_cls.preprocess_func(func) + client = default_client() partitions = np.array( [ [ cls.frame_partition_cls( - deploy_map_func.remote(func, obj, *args, **kwargs) + client.submit(deploy_map_func, func, obj, *args, **kwargs) ) ] for obj in iterable From 9fa2891cc229484197e754a04e716f9f2a8dcc3f Mon Sep 17 00:00:00 2001 From: Iaroslav Igoshev Date: Mon, 29 Apr 2024 16:20:07 +0200 Subject: [PATCH 5/6] Update modin/tests/pandas/test_io.py --- modin/tests/pandas/test_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/tests/pandas/test_io.py b/modin/tests/pandas/test_io.py index 24232a69271..023e8c5a0a5 100644 --- a/modin/tests/pandas/test_io.py +++ b/modin/tests/pandas/test_io.py @@ -3465,7 +3465,7 @@ def test_from_dask(): @pytest.mark.skipif( condition=Engine.get() not in ("Ray", "Dask", "Unidist"), - reason="Dask DataFrame can only be created if Modin uses Ray, Dask or MPI engine.", + reason="Modin DataFrame can only be created from map if Modin uses Ray, Dask or MPI engine.", ) @pytest.mark.filterwarnings(default_to_pandas_ignore_string) def test_from_map(): From 7325e4fb471f898bd7996e2399455c0f04574d46 Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Mon, 29 Apr 2024 20:01:00 +0000 Subject: [PATCH 6/6] Address comments Signed-off-by: Igoshev, Iaroslav --- .../execution/dask/common/engine_wrapper.py | 20 +++++++++-- .../implementations/pandas_on_dask/io/io.py | 34 ++++--------------- .../execution/ray/common/engine_wrapper.py | 18 +++++++--- .../implementations/pandas_on_ray/io/io.py | 31 ++--------------- .../unidist/common/engine_wrapper.py | 20 ++++++++--- .../pandas_on_unidist/io/io.py | 34 ++++--------------- 6 files changed, 62 insertions(+), 95 deletions(-) diff --git a/modin/core/execution/dask/common/engine_wrapper.py b/modin/core/execution/dask/common/engine_wrapper.py index f35f7ae2714..c79f83e7d68 100644 --- a/modin/core/execution/dask/common/engine_wrapper.py +++ b/modin/core/execution/dask/common/engine_wrapper.py @@ -15,12 +15,13 @@ from collections import UserDict +import pandas from dask.distributed import wait from distributed import Future from distributed.client import default_client -def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover +def _deploy_dask_func(func, *args, return_pandas_df=None, **kwargs): # pragma: no cover """ Wrap `func` to ease calling it remotely. @@ -30,6 +31,8 @@ def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover A local function that we want to call remotely. *args : iterable Positional arguments to pass to `func` when calling remotely. + return_pandas_df : bool, optional + Whether to convert the result of `func` to a pandas DataFrame or not. **kwargs : dict Keyword arguments to pass to `func` when calling remotely. @@ -38,7 +41,10 @@ def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover distributed.Future or list Dask identifier of the result being put into distributed memory. """ - return func(*args, **kwargs) + result = func(*args, **kwargs) + if return_pandas_df and not isinstance(result, pandas.DataFrame): + result = pandas.DataFrame(result) + return result class DaskWrapper: @@ -50,6 +56,7 @@ def deploy( func, f_args=None, f_kwargs=None, + return_pandas_df=None, num_returns=1, pure=True, ): @@ -64,6 +71,8 @@ def deploy( Positional arguments to pass to ``func``. f_kwargs : dict, optional Keyword arguments to pass to ``func``. + return_pandas_df : bool, optional + Whether to convert the result of `func` to a pandas DataFrame or not. num_returns : int, default: 1 The number of returned objects. pure : bool, default: True @@ -82,7 +91,12 @@ def deploy( else: # for the case where type(func) is distributed.Future remote_task_future = client.submit( - _deploy_dask_func, func, *args, pure=pure, **kwargs + _deploy_dask_func, + func, + *args, + pure=pure, + return_pandas_df=return_pandas_df, + **kwargs, ) if num_returns != 1: return [ diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py index b05db53598b..b1b9ee4ee82 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -14,7 +14,6 @@ """Module houses class that implements ``BaseIO`` using Dask as an execution engine.""" import numpy as np -import pandas from distributed.client import default_client from modin.core.execution.dask.common import DaskWrapper @@ -217,40 +216,19 @@ def from_map(cls, func, iterable, *args, **kwargs): QueryCompiler containing data returned by map function. """ func = cls.frame_cls._partition_mgr_cls.preprocess_func(func) - client = default_client() partitions = np.array( [ [ cls.frame_partition_cls( - client.submit(deploy_map_func, func, obj, *args, **kwargs) + DaskWrapper.deploy( + func, + f_args=(obj,) + args, + f_kwargs=kwargs, + return_pandas_df=True, + ) ) ] for obj in iterable ] ) return cls.query_compiler_cls(cls.frame_cls(partitions)) - - -def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover - """ - Deploy a func to apply to an object. - - Parameters - ---------- - func : callable - Function to map across the iterable object. - obj : object - An object to apply a function to. - *args : tuple - Positional arguments to pass in `func`. - **kwargs : dict - Keyword arguments to pass in `func`. - - Returns - ------- - pandas.DataFrame - """ - result = func(obj, *args, **kwargs) - if not isinstance(result, pandas.DataFrame): - result = pandas.DataFrame(result) - return result diff --git a/modin/core/execution/ray/common/engine_wrapper.py b/modin/core/execution/ray/common/engine_wrapper.py index 930941de701..94535c91407 100644 --- a/modin/core/execution/ray/common/engine_wrapper.py +++ b/modin/core/execution/ray/common/engine_wrapper.py @@ -22,6 +22,7 @@ from types import FunctionType from typing import Sequence +import pandas import ray from ray.util.client.common import ClientObjectRef @@ -30,7 +31,7 @@ @ray.remote -def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover +def _deploy_ray_func(func, *args, return_pandas_df=None, **kwargs): # pragma: no cover """ Wrap `func` to ease calling it remotely. @@ -40,6 +41,8 @@ def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover A local function that we want to call remotely. *args : iterable Positional arguments to pass to `func` when calling remotely. + return_pandas_df : bool, optional + Whether to convert the result of `func` to a pandas DataFrame or not. **kwargs : dict Keyword arguments to pass to `func` when calling remotely. @@ -48,7 +51,10 @@ def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover ray.ObjectRef or list Ray identifier of the result being put to Plasma store. """ - return func(*args, **kwargs) + result = func(*args, **kwargs) + if return_pandas_df and not isinstance(result, pandas.DataFrame): + result = pandas.DataFrame(result) + return result class RayWrapper: @@ -57,7 +63,9 @@ class RayWrapper: _func_cache = {} @classmethod - def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): + def deploy( + cls, func, f_args=None, f_kwargs=None, return_pandas_df=None, num_returns=1 + ): """ Run local `func` remotely. @@ -69,6 +77,8 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): Positional arguments to pass to ``func``. f_kwargs : dict, optional Keyword arguments to pass to ``func``. + return_pandas_df : bool, optional + Whether to convert the result of `func` to a pandas DataFrame or not. num_returns : int, default: 1 Amount of return values expected from `func`. @@ -81,7 +91,7 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): kwargs = {} if f_kwargs is None else f_kwargs return _deploy_ray_func.options( num_returns=num_returns, resources=RayTaskCustomResources.get() - ).remote(func, *args, **kwargs) + ).remote(func, *args, return_pandas_df=return_pandas_df, **kwargs) @classmethod def is_future(cls, item): diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 24b9e0823fa..0f3f40ef149 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -17,7 +17,6 @@ import numpy as np import pandas -import ray from pandas.io.common import get_handle, stringify_path from ray.data import from_pandas_refs @@ -335,36 +334,12 @@ def from_map(cls, func, iterable, *args, **kwargs): [ [ cls.frame_partition_cls( - deploy_map_func.remote(func, obj, *args, **kwargs) + RayWrapper.deploy( + func, f_args=(obj,) + args, return_pandas_df=True, **kwargs + ) ) ] for obj in iterable ] ) return cls.query_compiler_cls(cls.frame_cls(partitions)) - - -@ray.remote -def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover - """ - Deploy a func to apply to an object. - - Parameters - ---------- - func : callable - Function to map across the iterable object. - obj : object - An object to apply a function to. - *args : tuple - Positional arguments to pass in `func`. - **kwargs : dict - Keyword arguments to pass in `func`. - - Returns - ------- - pandas.DataFrame - """ - result = func(obj, *args, **kwargs) - if not isinstance(result, pandas.DataFrame): - result = pandas.DataFrame(result) - return result diff --git a/modin/core/execution/unidist/common/engine_wrapper.py b/modin/core/execution/unidist/common/engine_wrapper.py index 08937cf30a6..6a0b8b56d9d 100644 --- a/modin/core/execution/unidist/common/engine_wrapper.py +++ b/modin/core/execution/unidist/common/engine_wrapper.py @@ -19,11 +19,14 @@ import asyncio +import pandas import unidist @unidist.remote -def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover +def _deploy_unidist_func( + func, *args, return_pandas_df=None, **kwargs +): # pragma: no cover """ Wrap `func` to ease calling it remotely. @@ -33,6 +36,8 @@ def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover A local function that we want to call remotely. *args : iterable Positional arguments to pass to `func` when calling remotely. + return_pandas_df : bool, optional + Whether to convert the result of `func` to a pandas DataFrame or not. **kwargs : dict Keyword arguments to pass to `func` when calling remotely. @@ -41,14 +46,19 @@ def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover unidist.ObjectRef or list[unidist.ObjectRef] Unidist identifier of the result being put to object store. """ - return func(*args, **kwargs) + result = func(*args, **kwargs) + if return_pandas_df and not isinstance(result, pandas.DataFrame): + result = pandas.DataFrame(result) + return result class UnidistWrapper: """Mixin that provides means of running functions remotely and getting local results.""" @classmethod - def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): + def deploy( + cls, func, f_args=None, f_kwargs=None, return_pandas_df=None, num_returns=1 + ): """ Run local `func` remotely. @@ -60,6 +70,8 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): Positional arguments to pass to ``func``. f_kwargs : dict, optional Keyword arguments to pass to ``func``. + return_pandas_df : bool, optional + Whether to convert the result of `func` to a pandas DataFrame or not. num_returns : int, default: 1 Amount of return values expected from `func`. @@ -71,7 +83,7 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): args = [] if f_args is None else f_args kwargs = {} if f_kwargs is None else f_kwargs return _deploy_unidist_func.options(num_returns=num_returns).remote( - func, *args, **kwargs + func, *args, return_pandas_df=return_pandas_df, **kwargs ) @classmethod diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py index 90a440efa00..ed8e7239835 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py @@ -17,7 +17,6 @@ import numpy as np import pandas -import unidist from pandas.io.common import get_handle, stringify_path from modin.core.execution.unidist.common import SignalActor, UnidistWrapper @@ -291,36 +290,15 @@ def from_map(cls, func, iterable, *args, **kwargs): [ [ cls.frame_partition_cls( - deploy_map_func.remote(func, obj, *args, **kwargs) + UnidistWrapper.deploy( + func, + f_args=(obj,) + args, + f_kwargs=kwargs, + return_pandas_df=True, + ) ) ] for obj in iterable ] ) return cls.query_compiler_cls(cls.frame_cls(partitions)) - - -@unidist.remote -def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover - """ - Deploy a func to apply to an object. - - Parameters - ---------- - func : callable - Function to map across the iterable object. - obj : object - An object to apply a function to. - *args : tuple - Positional arguments to pass in `func`. - **kwargs : dict - Keyword arguments to pass in `func`. - - Returns - ------- - pandas.DataFrame - """ - result = func(obj, *args, **kwargs) - if not isinstance(result, pandas.DataFrame): - result = pandas.DataFrame(result) - return result