Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client] Modin fails to detect ray client runtime moving picking objects #14857

Closed
Bhavya6187 opened this issue Mar 22, 2021 · 5 comments
Closed
Labels
bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks triage Needs triage (eg: priority, bug/not-bug, and owning component)
Milestone

Comments

@Bhavya6187
Copy link

Bhavya6187 commented Mar 22, 2021

What is the problem?

Modin is failing to detect ray client when pickling/unpickling objects.
Ray version - 2.0.0.dev0 nightly
Modin version - master (0.9.1+13.gc23b3ce)
Python - 3.7

Reproduction (REQUIRED)

import ray
import ray.util
ray.util.connect("<service_ip>:50051")

import modin.pandas as pd

colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
df = pd.read_csv("s3://<>/HIGGS.csv", names=colnames)

@ray.remote
def add_rows(modin_df):
    for i, row in (df_input.iterrows()):
        modin_df.at[i,"sum"] = row['feature-01'] + row['feature-02'] + row['feature-03']
    return df_input    
df_2 = ray.get(add_rows.remote(df))

This is the stacktrace -

---------------------------------------------------------------------------
RaySystemError                            Traceback (most recent call last)
<ipython-input-8-c432ccc894f7> in <module>()
     13         modin_df.at[i,"sum"] = row['feature-01'] + row['feature-02'] + row['feature-03']
     14     return df_input
---> 15 df_2 = ray.get(add_rows.remote(df))

/usr/local/lib/python3.7/site-packages/ray/remote_function.py in _remote_proxy(*args, **kwargs)
    102         @wraps(function)
    103         def _remote_proxy(*args, **kwargs):
--> 104             return self._remote(args=args, kwargs=kwargs)
    105 
    106         self.remote = _remote_proxy

/usr/local/lib/python3.7/site-packages/ray/remote_function.py in _remote(self, args, kwargs, num_returns, num_cpus, num_gpus, memory, object_store_memory, accelerator_type, resources, max_retries, placement_group, placement_group_bundle_index, placement_group_capture_child_tasks, runtime_env, override_environment_variables, name)
    207                 runtime_env=runtime_env,
    208                 override_environment_variables=override_environment_variables,
--> 209                 name=name)
    210 
    211         worker = ray.worker.global_worker

/usr/local/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs)
     70         setattr(func_cls, RAY_CLIENT_MODE_ATTR, key)
     71     client_func = ray._get_converted(key)
---> 72     return client_func._remote(in_args, in_kwargs, **kwargs)
     73 
     74 

/usr/local/lib/python3.7/site-packages/ray/util/client/common.py in _remote(self, args, kwargs, **option_args)
    104         if kwargs is None:
    105             kwargs = {}
--> 106         return self.options(**option_args).remote(*args, **kwargs)
    107 
    108     def __repr__(self):

/usr/local/lib/python3.7/site-packages/ray/util/client/common.py in remote(self, *args, **kwargs)
    281 
    282     def remote(self, *args, **kwargs):
--> 283         return return_refs(ray.call_remote(self, *args, **kwargs))
    284 
    285     def __getattr__(self, key):

/usr/local/lib/python3.7/site-packages/ray/util/client/api.py in call_remote(self, instance, *args, **kwargs)
     94             kwargs: opaque keyword arguments
     95         """
---> 96         return self.worker.call_remote(instance, *args, **kwargs)
     97 
     98     def call_release(self, id: bytes) -> None:

/usr/local/lib/python3.7/site-packages/ray/util/client/worker.py in call_remote(self, instance, *args, **kwargs)
    287         task = instance._prepare_client_task()
    288         for arg in args:
--> 289             pb_arg = convert_to_arg(arg, self._client_id)
    290             task.args.append(pb_arg)
    291         for k, v in kwargs.items():

/usr/local/lib/python3.7/site-packages/ray/util/client/client_pickler.py in convert_to_arg(val, client_id)
    174     out = ray_client_pb2.Arg()
    175     out.local = ray_client_pb2.Arg.Locality.INTERNED
--> 176     out.data = dumps_from_client(val, client_id)
    177     return out

/usr/local/lib/python3.7/site-packages/ray/util/client/client_pickler.py in dumps_from_client(obj, client_id, protocol)
    154         with io.BytesIO() as file:
    155             cp = ClientPickler(client_id, file, protocol=protocol)
--> 156             cp.dump(obj)
    157             return file.getvalue()
    158 

/usr/local/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    578     def dump(self, obj):
    579         try:
--> 580             return Pickler.dump(self, obj)
    581         except RuntimeError as e:
    582             if "recursion" in e.args[0]:

/usr/local/lib/python3.7/site-packages/modin/pandas/dataframe.py in __reduce__(self)
   2430 
   2431     def __reduce__(self):
-> 2432         self._query_compiler.finalize()
   2433         if PersistentPickle.get():
   2434             return self._inflate_full, (self._to_pandas(),)

/usr/local/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py in finalize(self)
    203 
    204     def finalize(self):
--> 205         self._modin_frame.finalize()
    206 
    207     def to_pandas(self):

/usr/local/lib/python3.7/site-packages/modin/engines/base/frame/data.py in finalize(self)
   2138         that were used to build it.
   2139         """
-> 2140         [part.drain_call_queue() for row in self._partitions for part in row]

/usr/local/lib/python3.7/site-packages/modin/engines/base/frame/data.py in <listcomp>(.0)
   2138         that were used to build it.
   2139         """
-> 2140         [part.drain_call_queue() for row in self._partitions for part in row]

/usr/local/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/partition.py in drain_call_queue(self)
     88             self._width_cache,
     89             self._ip_cache,
---> 90         ) = deploy_ray_func.remote(call_queue, oid)
     91         self.call_queue = []
     92 

/usr/local/lib/python3.7/site-packages/ray/remote_function.py in _remote_proxy(*args, **kwargs)
    102         @wraps(function)
    103         def _remote_proxy(*args, **kwargs):
--> 104             return self._remote(args=args, kwargs=kwargs)
    105 
    106         self.remote = _remote_proxy

/usr/local/lib/python3.7/site-packages/ray/remote_function.py in _remote(self, args, kwargs, num_returns, num_cpus, num_gpus, memory, object_store_memory, accelerator_type, resources, max_retries, placement_group, placement_group_bundle_index, placement_group_capture_child_tasks, runtime_env, override_environment_variables, name)
    210 
    211         worker = ray.worker.global_worker
--> 212         worker.check_connected()
    213 
    214         # If this function was not exported in this session and job, we need to

/usr/local/lib/python3.7/site-packages/ray/worker.py in check_connected(self)
    202         """
    203         if not self.connected:
--> 204             raise RaySystemError("Ray has not been started yet. You can "
    205                                  "start Ray with 'ray.init()'.")
    206 

RaySystemError: System error: Ray has not been started yet. You can start Ray with 'ray.init()'.

  • [Y] I have verified my script runs in a clean environment and reproduces the issue.
  • [Y] I have verified the issue also occurs with the latest wheels.
@Bhavya6187 Bhavya6187 added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Mar 22, 2021
@Bhavya6187 Bhavya6187 changed the title [ray_client] Modin fails to detect ray client runtime moving picking objects [] [ray_client] Modin fails to detect ray client runtime moving picking objects Mar 22, 2021
@richardliaw richardliaw changed the title [ray_client] Modin fails to detect ray client runtime moving picking objects [client] Modin fails to detect ray client runtime moving picking objects Apr 3, 2021
@richardliaw
Copy link
Contributor

cc @AmeerHajAli

@AmeerHajAli AmeerHajAli added the P1 Issue that should be fixed within a few weeks label Apr 4, 2021
@AmeerHajAli AmeerHajAli added this to the Serverless Autoscaling milestone Apr 4, 2021
@AmeerHajAli
Copy link
Contributor

AmeerHajAli commented Apr 11, 2021

CC @ijrsvt @devin-petersohn

@devin-petersohn
Copy link
Member

@Bhavya6187 The best practices in mixing Modin and Ray code are to use Modin's developer tools.

import ray
import ray.util
ray.util.connect("<service_ip>:50051")

import modin.pandas as pd
from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions

colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
df = pd.read_csv("s3://<>/HIGGS.csv", names=colnames)

object_refs = unwrap_partitions(df, axis=0)

@ray.remote
def add_rows(partition_df):
    for i, row in (partition_df.iterrows()):
        partition_df.at[i,"sum"] = row['feature-01'] + row['feature-02'] + row['feature-03']
    return partition_df    

parts = unwrap_partitions(df, axis=0)
df_2 = from_partitions([add_rows.remote(p) for p in parts], axis=0)

Let me know how it goes!

@AmeerHajAli
Copy link
Contributor

@Bhavya6187 , I am closing for now since there is no clear action item here. It seems like the issue is in how the example provided is mixing Modin and Ray.

@Bhavya6187
Copy link
Author

@devin-petersohn I can work with this! Thanks a lot for the advice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

No branches or pull requests

4 participants