Skip to content

Commit

Permalink
Issue #432 Fix failing tests on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
JohanKJSchreurs committed Jul 28, 2023
1 parent 4dc2ff5 commit 25a5fd3
Showing 1 changed file with 142 additions and 2 deletions.
144 changes: 142 additions & 2 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import multiprocessing
import platform
import threading
from unittest import mock

# TODO: can we avoid using httpretty?
Expand Down Expand Up @@ -117,6 +119,13 @@ def start_job(row, connection, **kwargs):
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
assert metadata_path.exists()

@pytest.mark.skipif(
platform.system() == "Windows",
reason=(
"Windows support for multiprocessing is too different, the errors to"
"solve are too complicated: pickling certain local functions fails."
),
)
def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock):
"""Make sure the MultiBackendJobManager does not hang after all processes finish.
Expand Down Expand Up @@ -220,11 +229,10 @@ def start_job(row, connection, **kwargs):

def start_worker_thread():
manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
# process_finished_file.write_text("done")

# This should be finished almost immediately within a second.
# If it takes too long then we assume it will run forever.
# We will check that the exit code of this process == 0, i.e. it finished normally.
# We will check that the exit code of this process == 0, i.e. it finished normally.
# If it gets killed it will have a different exit code
# (On Linux usually -9 SIGKILL)
proc = multiprocessing.Process(target=start_worker_thread, name="Worker process")
Expand Down Expand Up @@ -258,6 +266,138 @@ def start_worker_thread():
metadata_path = manager.get_job_metadata_path(job_id="job-2021")
assert metadata_path.exists()

def test_manager_must_exit_when_all_jobs_done_windows(self, tmp_path, requests_mock, sleep_mock):
"""Make sure the MultiBackendJobManager does not hang after all processes finish.
Regression test for:
https://github.com/Open-EO/openeo-python-client/issues/432
Cause was that the run_jobs had an infinite loop when jobs ended with status error.
"""

requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"})
requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"})

def mock_job_status(job_id, succeeds: bool):
"""Mock job status polling sequence.
We set up one job with finishes with status error
"""
response_list = sum(
[
[
{
"json": {
"id": job_id,
"title": f"Job {job_id}",
"status": "queued",
}
}
],
[
{
"json": {
"id": job_id,
"title": f"Job {job_id}",
"status": "running",
}
}
],
[
{
"json": {
"id": job_id,
"title": f"Job {job_id}",
"status": "finished" if succeeds else "error",
}
}
],
],
[],
)
for backend in ["http://foo.test", "http://bar.test"]:
requests_mock.get(f"{backend}/jobs/{job_id}", response_list)
# It also needs job results endpoint for succesful jobs and the
# log endpoint for a failed job. Both are dummy implementations.
# When the job is finished the system tries to download the
# results or the logs and that is what needs these endpoints.
if succeeds:
requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []})
else:
response = {
"level": "error",
"logs": [
{
"id": "1",
"code": "SampleError",
"level": "error",
"message": "Error for testing",
"time": "2019-08-24T14:15:22Z",
"data": None,
"path": [],
"usage": {},
"links": [],
}
],
"links": [],
}
requests_mock.get(f"{backend}/jobs/{job_id}/logs?level=error", json=response)

mock_job_status("job-2018", succeeds=True)
mock_job_status("job-2019", succeeds=True)
mock_job_status("job-2020", succeeds=True)
mock_job_status("job-2021", succeeds=True)
mock_job_status("job-2022", succeeds=False)

root_dir = tmp_path / "job_mgr_root"
manager = MultiBackendJobManager(root_dir=root_dir)

manager.add_backend("foo", connection=openeo.connect("http://foo.test"))
manager.add_backend("bar", connection=openeo.connect("http://bar.test"))

df = pd.DataFrame(
{
"year": [2018, 2019, 2020, 2021, 2022],
# Use simple points in WKT format to test conversion to the geometry dtype
"geometry": ["POINT (1 2)"] * 5,
}
)
output_file = tmp_path / "jobs.csv"

def start_job(row, connection, **kwargs):
year = row["year"]
return BatchJob(job_id=f"job-{year}", connection=connection)

is_done_file = tmp_path / "is_done.txt"

def start_worker_thread():
manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
is_done_file.write_text("Done!")

thread = threading.Thread(target=start_worker_thread, name="Worker process", daemon=True)

timeout_sec = 5.0
thread.start()
# We stop waiting for the process after the timeout.
# If that happens it is likely we detected that run_jobs will loop infinitely.
thread.join(timeout=timeout_sec)

assert is_done_file.exists(), (
"MultiBackendJobManager did not finish on its own and was killed. " + "Infinite loop is probable."
)

# Also check that we got sensible end results.
result = pd.read_csv(output_file)
assert len(result) == 5
assert set(result.status) == {"finished", "error"}
assert set(result.backend_name) == {"foo", "bar"}

# We expect that the job metadata was saved for a successful job,
# so verify that it exists.
# Checking it for one of the jobs is enough.
metadata_path = manager.get_job_metadata_path(job_id="job-2021")
assert metadata_path.exists()


def test_on_error_log(self, tmp_path, requests_mock):
backend = "http://foo.test"
requests_mock.get(backend, json={"api_version": "1.1.0"})
Expand Down

0 comments on commit 25a5fd3

Please sign in to comment.