Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6492: Add from_map feature to create dataframe #7215

Merged
merged 6 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""Module houses class that implements ``BaseIO`` using Dask as an execution engine."""

import numpy as np
import pandas
from distributed.client import default_client

from modin.core.execution.dask.common import DaskWrapper
Expand Down Expand Up @@ -68,6 +70,7 @@ class PandasOnDaskIO(BaseIO):
"""The class implements interface in ``BaseIO`` using Dask as an execution engine."""

frame_cls = PandasOnDaskDataframe
frame_partition_cls = PandasOnDaskDataframePartition
query_compiler_cls = PandasQueryCompiler
build_args = dict(
frame_cls=PandasOnDaskDataframe,
Expand Down Expand Up @@ -188,3 +191,66 @@ def df_to_series(df):
partitions = [client.submit(df_to_series, part) for part in partitions]

return from_delayed(partitions)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.

This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.

Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
func = cls.frame_cls._partition_mgr_cls.preprocess_func(func)
client = default_client()
partitions = np.array(
[
[
cls.frame_partition_cls(
client.submit(deploy_map_func, func, obj, *args, **kwargs)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))


def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover
"""
Deploy a func to apply to an object.

Parameters
----------
func : callable
Function to map across the iterable object.
obj : object
An object to apply a function to.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
pandas.DataFrame
"""
result = func(obj, *args, **kwargs)
if not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result
5 changes: 5 additions & 0 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ def from_ray(cls, ray_obj):
def from_dask(cls, dask_obj):
return cls.get_factory()._from_dask(dask_obj)

@classmethod
@_inherit_docstrings(factories.BaseFactory._from_map)
def from_map(cls, func, iterable, *args, **kwargs):
return cls.get_factory()._from_map(func, iterable, *args, **kwargs)

@classmethod
@_inherit_docstrings(factories.BaseFactory._read_parquet)
def read_parquet(cls, **kwargs):
Expand Down
26 changes: 26 additions & 0 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,32 @@ def _from_ray(cls, ray_obj):
def _from_dask(cls, dask_obj):
return cls.io_cls.from_dask(dask_obj)

@classmethod
def _from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.

This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.

Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
return cls.io_cls.from_map(func, iterable, *args, **kwargs)

@classmethod
@doc(
_doc_io_method_template,
Expand Down
66 changes: 66 additions & 0 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import io

import numpy as np
import pandas
import ray
from pandas.io.common import get_handle, stringify_path
from ray.data import from_pandas_refs

Expand Down Expand Up @@ -68,6 +70,7 @@ class PandasOnRayIO(RayIO):
"""Factory providing methods for performing I/O operations using pandas as storage format on Ray as engine."""

frame_cls = PandasOnRayDataframe
frame_partition_cls = PandasOnRayDataframePartition
query_compiler_cls = PandasQueryCompiler
build_args = dict(
frame_partition_cls=PandasOnRayDataframePartition,
Expand Down Expand Up @@ -302,3 +305,66 @@ def to_ray(cls, modin_obj):
"""
parts = unwrap_partitions(modin_obj, axis=0)
return from_pandas_refs(parts)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.

This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.

Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
func = cls.frame_cls._partition_mgr_cls.preprocess_func(func)
partitions = np.array(
[
[
cls.frame_partition_cls(
deploy_map_func.remote(func, obj, *args, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to use RayWrraper.deploy here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And corresponding wrappers for other engines.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RayWrapper.deploy deploys a function that can return any object but here we intentionally wrap a result in a pandas DataFrame if the user hasn't done so. I would leave the changes as is. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce the likelihood of error, we need to either have all launch options in one place, or use only one method. There is a tendency that launching functions becomes more difficult due to additional parameters. A good example is resources=RayTaskCustomResources.get(), which is currently not taken into account here.

We can move this function to engine_wrapper.py and call it inside Raywrapper.deploy using an additional parameter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-used *.deploy.

)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))


@ray.remote
def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover
"""
Deploy a func to apply to an object.

Parameters
----------
func : callable
Function to map across the iterable object.
obj : object
An object to apply a function to.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
pandas.DataFrame
"""
result = func(obj, *args, **kwargs)
if not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import io

import numpy as np
import pandas
import unidist
from pandas.io.common import get_handle, stringify_path

from modin.core.execution.unidist.common import SignalActor, UnidistWrapper
Expand Down Expand Up @@ -62,6 +64,7 @@ class PandasOnUnidistIO(UnidistIO):
"""Factory providing methods for performing I/O operations using pandas as storage format on unidist as engine."""

frame_cls = PandasOnUnidistDataframe
frame_partition_cls = PandasOnUnidistDataframePartition
query_compiler_cls = PandasQueryCompiler
build_args = dict(
frame_partition_cls=PandasOnUnidistDataframePartition,
Expand Down Expand Up @@ -258,3 +261,66 @@ def func(df, **kw): # pragma: no cover
UnidistWrapper.materialize(
[part.list_of_blocks[0] for row in result for part in row]
)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use already implemented functions with num_splits=1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get what would you like use use instead. Please elaborate. We are adding a new from_map by analogy with other io functions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we can't use anything from existing functionality as every method of a Modin Dataframe assumes there is a dataframe with partitions to apply a function to.

"""
Create a Modin `query_compiler` from a map function.

This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.

Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
func = cls.frame_cls._partition_mgr_cls.preprocess_func(func)
partitions = np.array(
[
[
cls.frame_partition_cls(
deploy_map_func.remote(func, obj, *args, **kwargs)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))


@unidist.remote
def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover
"""
Deploy a func to apply to an object.

Parameters
----------
func : callable
Function to map across the iterable object.
obj : object
An object to apply a function to.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
pandas.DataFrame
"""
result = func(obj, *args, **kwargs)
if not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result
28 changes: 28 additions & 0 deletions modin/core/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,34 @@
"Modin DataFrame can only be converted to a Dask DataFrame if Modin uses a Dask engine."
)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.

This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.

Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
raise RuntimeError(

Check warning on line 191 in modin/core/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/io/io.py#L191

Added line #L191 was not covered by tests
"Modin DataFrame can only be created if Modin uses Ray, Dask or MPI engine."
)

@classmethod
@_inherit_docstrings(pandas.read_parquet, apilink="pandas.read_parquet")
@doc(
Expand Down
30 changes: 30 additions & 0 deletions modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,36 @@ def from_dask(dask_obj) -> DataFrame:
return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_dask(dask_obj))


def from_map(func, iterable, *args, **kwargs) -> DataFrame:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation needs to be updated I suppose.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have docs for such methods as from_pandas, from_ray, from_dask, etc. Do you think we should update docs on this matter in a separate issue in one go?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ок

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YarShev are you going to do this before release?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be great - #7256.

"""
Create a Modin DataFrame from map function applied to an iterable object.

This method will construct a Modin DataFrame split by row partitions.
The number of row partitions matches the number of elements in the iterable object.

Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.

Returns
-------
DataFrame
A new Modin DataFrame object.
"""
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

return ModinObjects.DataFrame(
query_compiler=FactoryDispatcher.from_map(func, iterable, *args, *kwargs)
)


def to_pandas(modin_obj: SupportsPublicToPandas) -> DataFrame | Series:
"""
Convert a Modin DataFrame/Series to a pandas DataFrame/Series.
Expand Down
Loading
Loading