Skip to content

Commit

Permalink
Fixes to avoid res_spec modification
Browse files Browse the repository at this point in the history
* GCE now makes a deepcopy of the resource_specification to avoid modifying the user supplied object.
* stack of docstring changes
  • Loading branch information
yadudoc committed Nov 7, 2024
1 parent 75f9c5e commit d01a346
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 21 deletions.
2 changes: 1 addition & 1 deletion docs/userguide/execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Parsl currently supports the following executors:
These executors cover a broad range of execution requirements. As with other Parsl components, there is a standard interface (ParslExecutor) that can be implemented to add support for other executors.

5. `parsl.executors.globus_compute.GlobusComputeExecutor`: This executor uses `Globus Compute <https://globus-compute.readthedocs.io/en/latest/index.html>`_
as the execution backend to run functions on remote systems.
as the execution backend to run tasks on remote systems.

.. note::
Refer to :ref:`configuration-section` for information on how to configure these executors.
Expand Down
35 changes: 15 additions & 20 deletions parsl/executors/globus_compute.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import copy
import uuid
from concurrent.futures import Future
from typing import Any, Callable, Dict, Optional, Union
Expand Down Expand Up @@ -60,8 +61,7 @@ def __init__(
for more info.
label:
a label to name the executor; mainly utilized for
logging and advanced needs with multiple executors.
a label to name the executor
batch_size:
the maximum number of tasks to coalesce before
Expand Down Expand Up @@ -108,12 +108,10 @@ def __init__(
)

def start(self) -> None:
"""Empty function
"""
pass

def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
""" Submit fn to globus-compute
""" Submit func to globus-compute
Parameters
Expand All @@ -139,24 +137,21 @@ def submit(self, func: Callable, resource_specification: Dict[str, Any], *args:
Future
"""
self._executor.resource_specification = resource_specification or self.resource_specification
res_spec = copy.deepcopy(resource_specification or self.resource_specification)
# Pop user_endpoint_config since it is illegal in resource_spec for globus_compute
self._executor.user_endpoint_config = resource_specification.pop('user_endpoint_config', self.user_endpoint_config)
if res_spec:
user_endpoint_config = res_spec.pop('user_endpoint_config', self.user_endpoint_config)
else:
user_endpoint_config = self.user_endpoint_config

self._executor.resource_specification = res_spec
self._executor.user_endpoint_config = user_endpoint_config
return self._executor.submit(func, *args, **kwargs)

def shutdown(self, wait=True, *, cancel_futures=False):
def shutdown(self):
"""Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods
can be called after this one.
Parameters
----------
wait: If True, then this method will not return until all pending
futures have received results.
cancel_futures: If True, then this method will cancel all futures
that have not yet registered their tasks with the Compute web services.
Tasks cannot be cancelled once they are registered.
GCE.shutdown will cancel all futures that have not yet registered with
Globus Compute and will not wait for the launched futures to complete.
"""
return self._executor.shutdown()
return self._executor.shutdown(wait=False, cancel_futures=True)

0 comments on commit d01a346

Please sign in to comment.