Skip to content

Commit

Permalink
Adding job_status_kwargs to configure Parsl Strategy
Browse files Browse the repository at this point in the history
* GlobusComputeEngine(job_status_kwargs:dict|None) can be used to configure strategy. Defaults to {"max_idletime": 120.0, "strategy_period": 5.0}
* Test speedup improvements
  • Loading branch information
yadudoc committed Mar 26, 2024
1 parent 97c525b commit 321720f
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 17 deletions.
10 changes: 3 additions & 7 deletions compute_endpoint/globus_compute_endpoint/endpoint/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,9 @@ def process_pending_tasks():
# gracefully, iterate once a second whether a task has arrived.
nonlocal num_tasks_forwarded
while not self._quiesce_event.is_set():
if executor.bad_state_is_set:
try:
raise executor.executor_exception
except Exception:
log.exception("Engine has failed with an unrecoverrable error")
finally:
self.time_to_quit = True
if executor.executor_exception:
self.time_to_quit = True
log.exception(executor.executor_exception)

if self.time_to_quit:
self.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
encrypted: bool = True,
max_idletime: int = 120,
strategy: t.Optional[str] = "simple",
job_status_kwargs: t.Optional[t.Dict] = None,
**kwargs,
):
"""The ``GlobusComputeEngine`` is a shim over `Parsl's HighThroughputExecutor
Expand All @@ -65,6 +66,10 @@ def __init__(
strategy: Specify strategy to use from [None, 'simple']
job_status_kwargs: Kwarg options to be passed through to Parsl's
JobStatusPoller class that drives strategy to do auto-scaling.
Refer: parsl.readthedocs.io
encrypted: bool
Flag to enable/disable encryption (CurveZMQ). Default is True.
Expand Down Expand Up @@ -94,6 +99,9 @@ def __init__(
)
self.executor = executor
self._strategy = strategy
# Set defaults for JobStatusPoller
self._job_status_kwargs = {"max_idletime": 120.0, "strategy_period": 5.0}
self._job_status_kwargs.update(job_status_kwargs or {})

@property
def max_workers_per_node(self):
Expand Down Expand Up @@ -183,7 +191,7 @@ def start(
self._status_report_thread.start()
# Add executor to poller *after* executor has started
self.job_status_poller = JobStatusPoller(
strategy=self._strategy, max_idletime=self.max_idletime
strategy=self._strategy, **self._job_status_kwargs
)
self.job_status_poller.add_executors([self.executor])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ def test_provider_fail_at_init(tmp_path):
new_callable=mock.PropertyMock,
) as mock_prop:
mock_prop.return_value = 1
gce = GlobusComputeEngine(provider=SlurmProvider(init_blocks=1))
gce = GlobusComputeEngine(
provider=SlurmProvider(init_blocks=1),
job_status_kwargs={"max_idletime": 0.1, "strategy_period": 0.1},
)
gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path)

assert gce.bad_state_is_set is False, "Executor should be clean at test-start"
Expand Down Expand Up @@ -54,7 +57,10 @@ def test_provider_fail_at_scaling(tmp_path):
new_callable=mock.PropertyMock,
) as mock_prop:
mock_prop.return_value = 1
gce = GlobusComputeEngine(provider=SlurmProvider(init_blocks=0))
gce = GlobusComputeEngine(
provider=SlurmProvider(init_blocks=0),
job_status_kwargs={"max_idletime": 0.1, "strategy_period": 0.1},
)
gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path)

assert gce.bad_state_is_set is False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def gc_engine_bad_submit_command(tmp_path):
min_blocks=0,
max_blocks=1,
),
max_idletime=0,
strategy="simple",
job_status_kwargs={"max_idletime": 0, "strategy_period": 0.1},
)
queue = Queue()
engine.start(endpoint_id=ep_id, run_dir=str(tmp_path), results_passthrough=queue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def gc_engine_bad_submit_command(tmp_path):
max_blocks=1,
worker_init='echo "BAD SUBMIT COMMAND"; exit 0',
),
max_idletime=0,
job_status_kwargs={"max_idletime": 0, "strategy_period": 0.1},
strategy="simple",
)
queue = Queue()
Expand Down Expand Up @@ -60,7 +60,7 @@ def test_bad_submit_command(gc_engine_bad_submit_command, caplog):
future = engine.submit(task_id=task_id, packed_task=task_message)

with pytest.raises(BadStateException):
future.result(timeout=60)
future.result(timeout=1)

report = str(future.exception())
assert "EXIT CODE: 0" in report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def gc_engine_scaling(tmp_path):
min_blocks=0,
max_blocks=1,
),
max_idletime=0,
strategy="simple",
job_status_kwargs={"max_idletime": 0, "strategy_period": 0.1},
)
queue = Queue()
engine.start(endpoint_id=ep_id, run_dir=str(tmp_path), results_passthrough=queue)
Expand All @@ -47,7 +47,7 @@ def gc_engine_non_scaling(tmp_path):
max_blocks=1,
),
strategy=None,
max_idletime=0,
job_status_kwargs={"max_idletime": 0, "strategy_period": 0.1},
)
queue = Queue()
engine.start(endpoint_id=ep_id, run_dir=str(tmp_path), results_passthrough=queue)
Expand Down Expand Up @@ -77,7 +77,7 @@ def num_outstanding():
# With 0 tasks and excess workers we should expect scale_down
# While scale_down might be triggered it appears to take 1s
# lowest heartbeat period to detect a manager going down
try_assert(lambda: num_outstanding() == 1, timeout_ms=10000)
try_assert(lambda: num_outstanding() == 1)


def test_engine_no_scaling(gc_engine_non_scaling):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def gc_engine_with_retries(tmp_path):
address="127.0.0.1",
max_workers=1,
heartbeat_period=1,
heartbeat_threshold=2,
heartbeat_threshold=1,
max_retries_on_system_failure=0,
provider=LocalProvider(
init_blocks=1,
Expand Down

0 comments on commit 321720f

Please sign in to comment.