diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 8357dc277..08c5ccb9c 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,4 +1,7 @@ +import datetime as dt import json +import multiprocessing +from pathlib import Path from unittest import mock # TODO: can we avoid using httpretty? @@ -117,11 +120,21 @@ def start_job(row, connection, **kwargs): assert metadata_path.exists() def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock): + """Make sure the MultiBackendJobManager does not hang after all processes finish. + + Regression test for: + https://github.com/Open-EO/openeo-python-client/issues/432 + + Cause was that the run_jobs had an infinite loop when jobs ended with status error. + """ + requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) def mock_job_status(job_id, succeeds: bool): - """Mock job status polling sequence""" + """Mock job status polling sequence. + We set up one job with finishes with status error + """ response_list = sum( [ [ @@ -156,9 +169,10 @@ def mock_job_status(job_id, succeeds: bool): ) for backend in ["http://foo.test", "http://bar.test"]: requests_mock.get(f"{backend}/jobs/{job_id}", response_list) - # It also needs the job results endpoint, though that can be a dummy implementation. - # When the job is finished the system tries to download the results and that is what - # needs this endpoint. + # It also needs job results endpoint for succesful jobs and the + # log endpoint for a failed job. Both are dummy implementations. + # When the job is finished the system tries to download the + # results or the logs and that is what needs these endpoints. if succeeds: requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) else: @@ -209,42 +223,43 @@ def start_job(row, connection, **kwargs): # manager.run_jobs(df=df, start_job=start_job, output_file=output_file) ## assert sleep_mock.call_count > 1000 - import multiprocessing - import datetime as dt - from pathlib import Path - - duration_file: Path = tmp_path / "duration.txt" - def start_worker_thread(): - time_start = dt.datetime.now() manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - time_end = dt.datetime.now() - dur: dt.timedelta = time_end - time_start - duration_file.write_text(str(dur.total_seconds())) + # process_finished_file.write_text("done") # This should be finished almost immediately within a second. # If it takes too long then we assume it will run forever. + # We will check that the exit code of this process == 0, i.e. it finished normally. + # If it gets killed it will have a different exit code + # (On Linux usually -9 SIGKILL) proc = multiprocessing.Process(target=start_worker_thread, name="Worker process") timeout_sec = 5.0 proc.start() + # We stop waiting for the process after the timeout. + # If that happens it is likely we detected that run_jobs will loop infinitely. proc.join(timeout=timeout_sec) + if proc.is_alive: - # now forcibly kill the process. + # now forcibly kill the process, then have to join it again. proc.kill() proc.join() - assert duration_file.exists(), "MultiBackendJobManager did not stop on its own and was killed" - duration = float(duration_file.read_text()) - assert duration < timeout_sec, "MultiBackendJobManager did stop on its but took longer than expected to finish" + assert proc.exitcode == 0, ( + "MultiBackendJobManager did not finish on its own and was killed. " + + "Infinite loop is probable. Expected exit code == 0, but found " + + f"proc.exitcode={proc.exitcode!r}, proc={proc!r}" + ) + # Also check that we got sensible end results. result = pd.read_csv(output_file) assert len(result) == 5 assert set(result.status) == {"finished", "error"} assert set(result.backend_name) == {"foo", "bar"} - # We expect that the job metadata was saved, so verify that it exists. - # Checking for one of the jobs is enough. + # We expect that the job metadata was saved for a successful job, + # so verify that it exists. + # Checking it for one of the jobs is enough. metadata_path = manager.get_job_metadata_path(job_id="job-2021") assert metadata_path.exists()