Skip to content

Commit

Permalink
Issue #432 Simplified unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
JohanKJSchreurs committed Jul 28, 2023
1 parent 95ebcd4 commit 22340df
Showing 1 changed file with 35 additions and 20 deletions.
55 changes: 35 additions & 20 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import datetime as dt
import json
import multiprocessing
from pathlib import Path
from unittest import mock

# TODO: can we avoid using httpretty?
Expand Down Expand Up @@ -117,11 +120,21 @@ def start_job(row, connection, **kwargs):
assert metadata_path.exists()

def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock):
"""Make sure the MultiBackendJobManager does not hang after all processes finish.
Regression test for:
https://github.com/Open-EO/openeo-python-client/issues/432
Cause was that the run_jobs had an infinite loop when jobs ended with status error.
"""

requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"})
requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"})

def mock_job_status(job_id, succeeds: bool):
"""Mock job status polling sequence"""
"""Mock job status polling sequence.
We set up one job with finishes with status error
"""
response_list = sum(
[
[
Expand Down Expand Up @@ -156,9 +169,10 @@ def mock_job_status(job_id, succeeds: bool):
)
for backend in ["http://foo.test", "http://bar.test"]:
requests_mock.get(f"{backend}/jobs/{job_id}", response_list)
# It also needs the job results endpoint, though that can be a dummy implementation.
# When the job is finished the system tries to download the results and that is what
# needs this endpoint.
# It also needs job results endpoint for succesful jobs and the
# log endpoint for a failed job. Both are dummy implementations.
# When the job is finished the system tries to download the
# results or the logs and that is what needs these endpoints.
if succeeds:
requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []})
else:
Expand Down Expand Up @@ -209,42 +223,43 @@ def start_job(row, connection, **kwargs):
# manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
## assert sleep_mock.call_count > 1000

import multiprocessing
import datetime as dt
from pathlib import Path

duration_file: Path = tmp_path / "duration.txt"

def start_worker_thread():
time_start = dt.datetime.now()
manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
time_end = dt.datetime.now()
dur: dt.timedelta = time_end - time_start
duration_file.write_text(str(dur.total_seconds()))
# process_finished_file.write_text("done")

# This should be finished almost immediately within a second.
# If it takes too long then we assume it will run forever.
# We will check that the exit code of this process == 0, i.e. it finished normally.
# If it gets killed it will have a different exit code
# (On Linux usually -9 SIGKILL)
proc = multiprocessing.Process(target=start_worker_thread, name="Worker process")

timeout_sec = 5.0
proc.start()
# We stop waiting for the process after the timeout.
# If that happens it is likely we detected that run_jobs will loop infinitely.
proc.join(timeout=timeout_sec)

if proc.is_alive:
# now forcibly kill the process.
# now forcibly kill the process, then have to join it again.
proc.kill()
proc.join()

assert duration_file.exists(), "MultiBackendJobManager did not stop on its own and was killed"
duration = float(duration_file.read_text())
assert duration < timeout_sec, "MultiBackendJobManager did stop on its but took longer than expected to finish"
assert proc.exitcode == 0, (
"MultiBackendJobManager did not finish on its own and was killed. "
+ "Infinite loop is probable. Expected exit code == 0, but found "
+ f"proc.exitcode={proc.exitcode!r}, proc={proc!r}"
)

# Also check that we got sensible end results.
result = pd.read_csv(output_file)
assert len(result) == 5
assert set(result.status) == {"finished", "error"}
assert set(result.backend_name) == {"foo", "bar"}

# We expect that the job metadata was saved, so verify that it exists.
# Checking for one of the jobs is enough.
# We expect that the job metadata was saved for a successful job,
# so verify that it exists.
# Checking it for one of the jobs is enough.
metadata_path = manager.get_job_metadata_path(job_id="job-2021")
assert metadata_path.exists()

Expand Down

0 comments on commit 22340df

Please sign in to comment.