Skip to content

Commit

Permalink
FEAT-#2058: Improve how remote factories are defined (#2060)
Browse files Browse the repository at this point in the history
Signed-off-by: Vasilij Litvinov <[email protected]>
  • Loading branch information
vnlitvinov authored Sep 10, 2020
1 parent 0f54983 commit 2ca3f34
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 11 deletions.
17 changes: 17 additions & 0 deletions modin/data_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,20 @@
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from . import factories


def _get_remote_engines():
for name in dir(factories):
obj = getattr(factories, name)
if isinstance(obj, type) and issubclass(
obj, factories.ExperimentalRemoteFactory
):
try:
yield obj.get_info().engine
except factories.NotRealFactory:
pass


REMOTE_ENGINES = set(_get_remote_engines())
51 changes: 45 additions & 6 deletions modin/data_management/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# governing permissions and limitations under the License.

import warnings
import typing
import re

from modin import execution_engine
from modin.engines.base.io import BaseIO
Expand All @@ -21,13 +23,42 @@
types_dictionary = {"pandas": {"category": pandas.CategoricalDtype}}


class FactoryInfo(typing.NamedTuple):
engine: str
partition: str
experimental: bool


class NotRealFactory(Exception):
pass


class BaseFactory(object):
"""
Abstract factory which allows to override the io module easily.
"""

io_cls: BaseIO = None # The module where the I/O functionality exists.

@classmethod
def get_info(cls) -> FactoryInfo:
"""
This gets the information about the factory: its execution engine,
partitioning format and whether it's experimental-only.
Note that it parses factory name, so it must be conformant with how
ExecutionEngine class constructs factory names.
"""
try:
experimental, partition, engine = re.match(
r"^(Experimental)?(.*)On(.*)Factory$", cls.__name__
).groups()
except AttributeError:
raise NotRealFactory()
return FactoryInfo(
engine=engine, partition=partition, experimental=bool(experimental)
)

@classmethod
def prepare(cls):
"""
Expand Down Expand Up @@ -216,7 +247,9 @@ def prepare(cls):
cls.io_cls = PyarrowOnRayIO


class ExperimentalPandasOnCloudrayFactory(ExperimentalBaseFactory):
class ExperimentalRemoteFactory(ExperimentalBaseFactory):
wrapped_factory = BaseFactory

@classmethod
def prepare(cls):
# query_compiler import is needed so remote PandasQueryCompiler
Expand All @@ -231,11 +264,13 @@ def prepare(cls):
import modin.experimental.pandas.numpy_wrap # noqa: F401

class WrappedIO:
def __init__(self, conn):
def __init__(self, conn, factory):
self.__conn = conn
self.__io_cls = conn.modules[
"modin.engines.ray.pandas_on_ray.io"
].PandasOnRayIO
remote_factory = getattr(
conn.modules[factory.__module__], factory.__name__
)
remote_factory.prepare()
self.__io_cls = remote_factory.io_cls
self.__reads = {
name for name in BaseIO.__dict__ if name.startswith("read_")
}
Expand All @@ -256,4 +291,8 @@ def wrap(*a, _original=getattr(self.__io_cls, name), **kw):
wrap = getattr(self.__io_cls, name)
return wrap

cls.io_cls = WrappedIO(get_connection())
cls.io_cls = WrappedIO(get_connection(), cls.wrapped_factory)


class ExperimentalPandasOnCloudrayFactory(ExperimentalRemoteFactory):
wrapped_factory = PandasOnRayFactory
3 changes: 2 additions & 1 deletion modin/experimental/cloud/meta_magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import types

from modin import execution_engine
from modin.data_management import REMOTE_ENGINES

# the attributes that must be alwasy taken from a local part of dual-nature class,
# never going to remote end
Expand Down Expand Up @@ -153,7 +154,7 @@ def __new__(cls, *a, **kw):
_KNOWN_DUALS[local_cls] = result

def update_class(_):
if execution_engine.get() == "Cloudray":
if execution_engine.get() in REMOTE_ENGINES:
from . import rpyc_proxy

result.__real_cls__ = getattr(rpyc_proxy, rpyc_wrapper_name)(result)
Expand Down
3 changes: 2 additions & 1 deletion modin/experimental/pandas/numpy_wrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import types
import copyreg
from modin import execution_engine
from modin.data_management import REMOTE_ENGINES
import modin
import pandas
import os
Expand Down Expand Up @@ -78,7 +79,7 @@ def __swap_numpy(self, other_numpy=None):
self.__has_to_warn = False

def __update_engine(self, _):
if execution_engine.get() == "Cloudray":
if execution_engine.get() in REMOTE_ENGINES:
from modin.experimental.cloud import get_connection

self.__swap_numpy(get_connection().modules["numpy"])
Expand Down
4 changes: 1 addition & 3 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,8 @@ def _update_engine(publisher: Publisher):

elif publisher.get() == "Cloudray":
from modin.experimental.cloud import get_connection
import rpyc

conn: rpyc.ClassicService = get_connection()
conn = get_connection()
remote_ray = conn.modules["ray"]
if _is_first_update.get("Cloudray", True):

Expand All @@ -159,7 +158,6 @@ def init_remote_ray():
import modin.data_management.dispatcher # noqa: F401

num_cpus = remote_ray.cluster_resources()["CPU"]

elif publisher.get() not in _NOINIT_ENGINES:
raise ImportError("Unrecognized execution engine: {}.".format(publisher.get()))

Expand Down

0 comments on commit 2ca3f34

Please sign in to comment.