Skip to content

Commit

Permalink
Update submit function
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Oct 27, 2024
1 parent 25523b5 commit 4dd0b1b
Showing 2 changed files with 42 additions and 45 deletions.
16 changes: 10 additions & 6 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
@@ -7,10 +7,7 @@
)
from typing import Optional

from executorlib.standalone.inputcheck import (
check_resource_dict,
check_resource_dict_is_empty,
)
from executorlib.standalone.inputcheck import check_resource_dict
from executorlib.standalone.queue import cancel_items_in_queue
from executorlib.standalone.serialize import cloudpickle_register
from executorlib.standalone.thread import RaisingThread
@@ -89,10 +86,17 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut
Returns:
Future: A Future representing the given call.
"""
check_resource_dict_is_empty(resource_dict=resource_dict)
check_resource_dict(function=fn)
f = Future()
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
self._future_queue.put(
{
"fn": fn,
"args": args,
"kwargs": kwargs,
"future": f,
"resource_dict": resource_dict,
}
)
return f

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
71 changes: 32 additions & 39 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@

from executorlib.base.executor import ExecutorBase, cancel_items_in_queue
from executorlib.standalone.command import get_command_path
from executorlib.standalone.inputcheck import check_resource_dict
from executorlib.standalone.inputcheck import check_resource_dict, check_resource_dict_is_empty
from executorlib.standalone.interactive.communication import (
SocketInterface,
interface_bootup,
@@ -19,6 +19,37 @@


class ExecutorBroker(ExecutorBase):
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Args:
fn (callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}
Returns:
Future: A Future representing the given call.
"""
check_resource_dict_is_empty(resource_dict=resource_dict)
check_resource_dict(function=fn)
f = Future()
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
return f

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
"""Clean-up the resources associated with the Executor.
@@ -58,44 +89,6 @@ def _set_process(self, process: List[RaisingThread]):


class ExecutorSteps(ExecutorBase):
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Args:
fn (callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}
Returns:
A Future representing the given call.
"""
check_resource_dict(function=fn)
f = Future()
self._future_queue.put(
{
"fn": fn,
"args": args,
"kwargs": kwargs,
"future": f,
"resource_dict": resource_dict,
}
)
return f

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
"""Clean-up the resources associated with the Executor.

0 comments on commit 4dd0b1b

Please sign in to comment.