Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6492: Add from_map feature to create dataframe #7215

Merged
merged 6 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions modin/core/execution/dask/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@

from collections import UserDict

import pandas
from dask.distributed import wait
from distributed import Future
from distributed.client import default_client


def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover
def _deploy_dask_func(func, *args, return_pandas_df=None, **kwargs): # pragma: no cover
"""
Wrap `func` to ease calling it remotely.

Expand All @@ -30,6 +31,8 @@ def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover
A local function that we want to call remotely.
*args : iterable
Positional arguments to pass to `func` when calling remotely.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
**kwargs : dict
Keyword arguments to pass to `func` when calling remotely.

Expand All @@ -38,7 +41,10 @@ def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover
distributed.Future or list
Dask identifier of the result being put into distributed memory.
"""
return func(*args, **kwargs)
result = func(*args, **kwargs)
if return_pandas_df and not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result


class DaskWrapper:
Expand All @@ -50,6 +56,7 @@ def deploy(
func,
f_args=None,
f_kwargs=None,
return_pandas_df=None,
num_returns=1,
pure=True,
):
Expand All @@ -64,6 +71,8 @@ def deploy(
Positional arguments to pass to ``func``.
f_kwargs : dict, optional
Keyword arguments to pass to ``func``.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
num_returns : int, default: 1
The number of returned objects.
pure : bool, default: True
Expand All @@ -82,7 +91,12 @@ def deploy(
else:
# for the case where type(func) is distributed.Future
remote_task_future = client.submit(
_deploy_dask_func, func, *args, pure=pure, **kwargs
_deploy_dask_func,
func,
*args,
pure=pure,
return_pandas_df=return_pandas_df,
**kwargs,
)
if num_returns != 1:
return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

"""Module houses class that implements ``BaseIO`` using Dask as an execution engine."""

import numpy as np
from distributed.client import default_client

from modin.core.execution.dask.common import DaskWrapper
Expand Down Expand Up @@ -68,6 +69,7 @@ class PandasOnDaskIO(BaseIO):
"""The class implements interface in ``BaseIO`` using Dask as an execution engine."""

frame_cls = PandasOnDaskDataframe
frame_partition_cls = PandasOnDaskDataframePartition
query_compiler_cls = PandasQueryCompiler
build_args = dict(
frame_cls=PandasOnDaskDataframe,
Expand Down Expand Up @@ -188,3 +190,45 @@ def df_to_series(df):
partitions = [client.submit(df_to_series, part) for part in partitions]

return from_delayed(partitions)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.

This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.

Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
func = cls.frame_cls._partition_mgr_cls.preprocess_func(func)
partitions = np.array(
[
[
cls.frame_partition_cls(
DaskWrapper.deploy(
func,
f_args=(obj,) + args,
f_kwargs=kwargs,
return_pandas_df=True,
)
)
]
for obj in iterable
]
)
Comment on lines +219 to +233
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the information required to perform this task, it seems that a more appropriate level at which to define the function would be a partition manager, for example somewhere around:

def create_partition_from_metadata(cls, **metadata):

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave it here. Imagine a case when iterable is a list files.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine a case when iterable is a list files.

We'll be abstracting from the parameters just like we're doing now, so I don't see any difference.

return cls.query_compiler_cls(cls.frame_cls(partitions))
5 changes: 5 additions & 0 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ def from_ray(cls, ray_obj):
def from_dask(cls, dask_obj):
return cls.get_factory()._from_dask(dask_obj)

@classmethod
@_inherit_docstrings(factories.BaseFactory._from_map)
def from_map(cls, func, iterable, *args, **kwargs):
return cls.get_factory()._from_map(func, iterable, *args, **kwargs)

@classmethod
@_inherit_docstrings(factories.BaseFactory._read_parquet)
def read_parquet(cls, **kwargs):
Expand Down
26 changes: 26 additions & 0 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,32 @@ def _from_ray(cls, ray_obj):
def _from_dask(cls, dask_obj):
return cls.io_cls.from_dask(dask_obj)

@classmethod
def _from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.

This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.

Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
return cls.io_cls.from_map(func, iterable, *args, **kwargs)

@classmethod
@doc(
_doc_io_method_template,
Expand Down
18 changes: 14 additions & 4 deletions modin/core/execution/ray/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from types import FunctionType
from typing import Sequence

import pandas
import ray
from ray.util.client.common import ClientObjectRef

Expand All @@ -30,7 +31,7 @@


@ray.remote
def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover
def _deploy_ray_func(func, *args, return_pandas_df=None, **kwargs): # pragma: no cover
"""
Wrap `func` to ease calling it remotely.

Expand All @@ -40,6 +41,8 @@ def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover
A local function that we want to call remotely.
*args : iterable
Positional arguments to pass to `func` when calling remotely.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
**kwargs : dict
Keyword arguments to pass to `func` when calling remotely.

Expand All @@ -48,7 +51,10 @@ def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover
ray.ObjectRef or list
Ray identifier of the result being put to Plasma store.
"""
return func(*args, **kwargs)
result = func(*args, **kwargs)
if return_pandas_df and not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result


class RayWrapper:
Expand All @@ -57,7 +63,9 @@ class RayWrapper:
_func_cache = {}

@classmethod
def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
def deploy(
cls, func, f_args=None, f_kwargs=None, return_pandas_df=None, num_returns=1
):
"""
Run local `func` remotely.

Expand All @@ -69,6 +77,8 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
Positional arguments to pass to ``func``.
f_kwargs : dict, optional
Keyword arguments to pass to ``func``.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
num_returns : int, default: 1
Amount of return values expected from `func`.

Expand All @@ -81,7 +91,7 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
kwargs = {} if f_kwargs is None else f_kwargs
return _deploy_ray_func.options(
num_returns=num_returns, resources=RayTaskCustomResources.get()
).remote(func, *args, **kwargs)
).remote(func, *args, return_pandas_df=return_pandas_df, **kwargs)

@classmethod
def is_future(cls, item):
Expand Down
41 changes: 41 additions & 0 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io

import numpy as np
import pandas
from pandas.io.common import get_handle, stringify_path
from ray.data import from_pandas_refs
Expand Down Expand Up @@ -68,6 +69,7 @@ class PandasOnRayIO(RayIO):
"""Factory providing methods for performing I/O operations using pandas as storage format on Ray as engine."""

frame_cls = PandasOnRayDataframe
frame_partition_cls = PandasOnRayDataframePartition
query_compiler_cls = PandasQueryCompiler
build_args = dict(
frame_partition_cls=PandasOnRayDataframePartition,
Expand Down Expand Up @@ -302,3 +304,42 @@ def to_ray(cls, modin_obj):
"""
parts = unwrap_partitions(modin_obj, axis=0)
return from_pandas_refs(parts)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.

This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.

Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
func = cls.frame_cls._partition_mgr_cls.preprocess_func(func)
partitions = np.array(
[
[
cls.frame_partition_cls(
RayWrapper.deploy(
func, f_args=(obj,) + args, return_pandas_df=True, **kwargs
)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))
20 changes: 16 additions & 4 deletions modin/core/execution/unidist/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import asyncio

import pandas
import unidist


@unidist.remote
def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover
def _deploy_unidist_func(
func, *args, return_pandas_df=None, **kwargs
): # pragma: no cover
"""
Wrap `func` to ease calling it remotely.

Expand All @@ -33,6 +36,8 @@ def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover
A local function that we want to call remotely.
*args : iterable
Positional arguments to pass to `func` when calling remotely.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
**kwargs : dict
Keyword arguments to pass to `func` when calling remotely.

Expand All @@ -41,14 +46,19 @@ def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover
unidist.ObjectRef or list[unidist.ObjectRef]
Unidist identifier of the result being put to object store.
"""
return func(*args, **kwargs)
result = func(*args, **kwargs)
if return_pandas_df and not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result


class UnidistWrapper:
"""Mixin that provides means of running functions remotely and getting local results."""

@classmethod
def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
def deploy(
cls, func, f_args=None, f_kwargs=None, return_pandas_df=None, num_returns=1
):
"""
Run local `func` remotely.

Expand All @@ -60,6 +70,8 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
Positional arguments to pass to ``func``.
f_kwargs : dict, optional
Keyword arguments to pass to ``func``.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
num_returns : int, default: 1
Amount of return values expected from `func`.

Expand All @@ -71,7 +83,7 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
args = [] if f_args is None else f_args
kwargs = {} if f_kwargs is None else f_kwargs
return _deploy_unidist_func.options(num_returns=num_returns).remote(
func, *args, **kwargs
func, *args, return_pandas_df=return_pandas_df, **kwargs
)

@classmethod
Expand Down
Loading
Loading