Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Portal.run_func(): run local funcs remotely #105

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions tractor/_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import importlib
import inspect
import typing
from typing import Tuple, Any, Dict, Optional, Set
from typing import Tuple, Any, Dict, Optional, Set, Callable
from functools import partial
from dataclasses import dataclass

Expand Down Expand Up @@ -175,17 +175,45 @@ async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None:
"A pending main result has already been submitted"
self._expect_result = await self._submit(ns, func, kwargs)

async def run(self, ns: str, func: str, **kwargs) -> Any:
"""Submit a remote function to be scheduled and run by actor,
wrap and return its (stream of) result(s).
async def run(
self,
ns: str,
func: str,
**kwargs
) -> Any:
"""Run a remote function in another actor by providing its
explicit module path and function name.

This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance.
Return its (stream of) result(s) as though the remote callable
was invoked locally. This is a blocking call and delivers either
the return value from the remotely scheduled RPC task or a local async
iterator instance if a stream is expected.
"""
return await self._return_from_resptype(
*(await self._submit(ns, func, kwargs))
)

async def run_func(
self,
func: Callable,
**kwargs,
) -> Any:
"""Submit a local function by object reference to be scheduled
and run by another actor.

This is a convenience method and effectively the same as
``.run()`` except the explicit function namespace path is looked
up by introspecting the local function object and submitting
that via a ``.run()`` call.

.. note::

No local objects are serialized and sent over the wire; the
function provided must also be importable in the target actor
memory space.
"""
return await self.run(func.__module__, func.__name__, **kwargs)

async def _return_from_resptype(
self,
cid: str,
Expand Down