diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py b/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py index 20b578739..3d88d042f 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py @@ -370,13 +370,9 @@ def process_pending_tasks(): # gracefully, iterate once a second whether a task has arrived. nonlocal num_tasks_forwarded while not self._quiesce_event.is_set(): - if executor.bad_state_is_set: - try: - raise executor.executor_exception - except Exception: - log.exception("Engine has failed with an unrecoverrable error") - finally: - self.time_to_quit = True + if executor.executor_exception: + self.time_to_quit = True + log.exception(executor.executor_exception) if self.time_to_quit: self.stop() diff --git a/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py b/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py index 93d404ccc..13661dd03 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py +++ b/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py @@ -39,6 +39,7 @@ def __init__( encrypted: bool = True, max_idletime: int = 120, strategy: t.Optional[str] = "simple", + job_status_kwargs: t.Optional[t.Dict] = None, **kwargs, ): """The ``GlobusComputeEngine`` is a shim over `Parsl's HighThroughputExecutor @@ -65,6 +66,10 @@ def __init__( strategy: Specify strategy to use from [None, 'simple'] + job_status_kwargs: Kwarg options to be passed through to Parsl's + JobStatusPoller class that drives strategy to do auto-scaling. + Refer: parsl.readthedocs.io + encrypted: bool Flag to enable/disable encryption (CurveZMQ). Default is True. @@ -94,6 +99,9 @@ def __init__( ) self.executor = executor self._strategy = strategy + # Set defaults for JobStatusPoller + self._job_status_kwargs = {"max_idletime": 120.0, "strategy_period": 5.0} + self._job_status_kwargs.update(job_status_kwargs or {}) @property def max_workers_per_node(self): @@ -183,7 +191,7 @@ def start( self._status_report_thread.start() # Add executor to poller *after* executor has started self.job_status_poller = JobStatusPoller( - strategy=self._strategy, max_idletime=self.max_idletime + strategy=self._strategy, **self._job_status_kwargs ) self.job_status_poller.add_executors([self.executor]) diff --git a/compute_endpoint/tests/integration/endpoint/endpoint/test_gce_bad_config.py b/compute_endpoint/tests/integration/endpoint/endpoint/test_gce_bad_config.py index 31121b9aa..ad68f5423 100644 --- a/compute_endpoint/tests/integration/endpoint/endpoint/test_gce_bad_config.py +++ b/compute_endpoint/tests/integration/endpoint/endpoint/test_gce_bad_config.py @@ -19,7 +19,10 @@ def test_provider_fail_at_init(tmp_path): new_callable=mock.PropertyMock, ) as mock_prop: mock_prop.return_value = 1 - gce = GlobusComputeEngine(provider=SlurmProvider(init_blocks=1)) + gce = GlobusComputeEngine( + provider=SlurmProvider(init_blocks=1), + job_status_kwargs={"max_idletime": 0.1, "strategy_period": 0.1}, + ) gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path) assert gce.bad_state_is_set is False, "Executor should be clean at test-start" @@ -54,7 +57,10 @@ def test_provider_fail_at_scaling(tmp_path): new_callable=mock.PropertyMock, ) as mock_prop: mock_prop.return_value = 1 - gce = GlobusComputeEngine(provider=SlurmProvider(init_blocks=0)) + gce = GlobusComputeEngine( + provider=SlurmProvider(init_blocks=0), + job_status_kwargs={"max_idletime": 0.1, "strategy_period": 0.1}, + ) gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path) assert gce.bad_state_is_set is False diff --git a/compute_endpoint/tests/integration/endpoint/endpoint/test_gceengine_bad_provider.py b/compute_endpoint/tests/integration/endpoint/endpoint/test_gceengine_bad_provider.py index eee1d0ce8..b2395bc5c 100644 --- a/compute_endpoint/tests/integration/endpoint/endpoint/test_gceengine_bad_provider.py +++ b/compute_endpoint/tests/integration/endpoint/endpoint/test_gceengine_bad_provider.py @@ -28,8 +28,8 @@ def gc_engine_bad_submit_command(tmp_path): min_blocks=0, max_blocks=1, ), - max_idletime=0, strategy="simple", + job_status_kwargs={"max_idletime": 0, "strategy_period": 0.1}, ) queue = Queue() engine.start(endpoint_id=ep_id, run_dir=str(tmp_path), results_passthrough=queue) diff --git a/compute_endpoint/tests/integration/endpoint/endpoint/test_gceengine_bad_worker_init.py b/compute_endpoint/tests/integration/endpoint/endpoint/test_gceengine_bad_worker_init.py index 97d764faf..2a81d77f7 100644 --- a/compute_endpoint/tests/integration/endpoint/endpoint/test_gceengine_bad_worker_init.py +++ b/compute_endpoint/tests/integration/endpoint/endpoint/test_gceengine_bad_worker_init.py @@ -31,7 +31,7 @@ def gc_engine_bad_submit_command(tmp_path): max_blocks=1, worker_init='echo "BAD SUBMIT COMMAND"; exit 0', ), - max_idletime=0, + job_status_kwargs={"max_idletime": 0, "strategy_period": 0.1}, strategy="simple", ) queue = Queue() @@ -60,7 +60,7 @@ def test_bad_submit_command(gc_engine_bad_submit_command, caplog): future = engine.submit(task_id=task_id, packed_task=task_message) with pytest.raises(BadStateException): - future.result(timeout=60) + future.result(timeout=1) report = str(future.exception()) assert "EXIT CODE: 0" in report diff --git a/compute_endpoint/tests/integration/endpoint/endpoint/test_gcengine_strategy.py b/compute_endpoint/tests/integration/endpoint/endpoint/test_gcengine_strategy.py index 68def81ee..054e933ad 100644 --- a/compute_endpoint/tests/integration/endpoint/endpoint/test_gcengine_strategy.py +++ b/compute_endpoint/tests/integration/endpoint/endpoint/test_gcengine_strategy.py @@ -24,8 +24,8 @@ def gc_engine_scaling(tmp_path): min_blocks=0, max_blocks=1, ), - max_idletime=0, strategy="simple", + job_status_kwargs={"max_idletime": 0, "strategy_period": 0.1}, ) queue = Queue() engine.start(endpoint_id=ep_id, run_dir=str(tmp_path), results_passthrough=queue) @@ -47,7 +47,7 @@ def gc_engine_non_scaling(tmp_path): max_blocks=1, ), strategy=None, - max_idletime=0, + job_status_kwargs={"max_idletime": 0, "strategy_period": 0.1}, ) queue = Queue() engine.start(endpoint_id=ep_id, run_dir=str(tmp_path), results_passthrough=queue) @@ -77,7 +77,7 @@ def num_outstanding(): # With 0 tasks and excess workers we should expect scale_down # While scale_down might be triggered it appears to take 1s # lowest heartbeat period to detect a manager going down - try_assert(lambda: num_outstanding() == 1, timeout_ms=10000) + try_assert(lambda: num_outstanding() == 1) def test_engine_no_scaling(gc_engine_non_scaling): diff --git a/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py b/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py index aa18b37b2..99e89d0e4 100644 --- a/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py +++ b/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py @@ -16,7 +16,7 @@ def gc_engine_with_retries(tmp_path): address="127.0.0.1", max_workers=1, heartbeat_period=1, - heartbeat_threshold=2, + heartbeat_threshold=1, max_retries_on_system_failure=0, provider=LocalProvider( init_blocks=1,