-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use threading to run jobmanager loop #614
Changes from 5 commits
b8d0059
9b624ed
35e5fa5
2d44e7e
995e530
c7e0544
bfd99e3
0c7899b
1e12682
1e80314
95a4ec7
d19be05
219f972
062846c
a60e101
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
import textwrap | ||
import threading | ||
import time | ||
from time import sleep | ||
from typing import Callable, Union | ||
from unittest import mock | ||
|
||
|
@@ -85,6 +86,69 @@ def sleep_mock(self): | |
yield sleep | ||
|
||
def test_basic(self, tmp_path, requests_mock, sleep_mock): | ||
manager = self.create_basic_mocked_manager(requests_mock, tmp_path) | ||
|
||
df = pd.DataFrame( | ||
{ | ||
"year": [2018, 2019, 2020, 2021, 2022], | ||
# Use simple points in WKT format to test conversion to the geometry dtype | ||
"geometry": ["POINT (1 2)"] * 5, | ||
} | ||
) | ||
output_file = tmp_path / "jobs.csv" | ||
|
||
def start_job(row, connection, **kwargs): | ||
year = int(row["year"]) | ||
return BatchJob(job_id=f"job-{year}", connection=connection) | ||
|
||
manager.run_jobs(df=df, start_job=start_job, output_file=output_file) | ||
assert sleep_mock.call_count > 10 | ||
|
||
result = pd.read_csv(output_file) | ||
assert len(result) == 5 | ||
assert set(result.status) == {"finished"} | ||
assert set(result.backend_name) == {"foo", "bar"} | ||
|
||
# We expect that the job metadata was saved, so verify that it exists. | ||
# Checking for one of the jobs is enough. | ||
metadata_path = manager.get_job_metadata_path(job_id="job-2022") | ||
assert metadata_path.exists() | ||
|
||
def test_basic_threading(self, tmp_path, requests_mock, sleep_mock): | ||
manager = self.create_basic_mocked_manager(requests_mock, tmp_path) | ||
|
||
df = pd.DataFrame( | ||
{ | ||
"year": [2018, 2019, 2020, 2021, 2022], | ||
# Use simple points in WKT format to test conversion to the geometry dtype | ||
"geometry": ["POINT (1 2)"] * 5, | ||
} | ||
) | ||
output_file = tmp_path / "jobs.csv" | ||
|
||
def start_job(row, connection, **kwargs): | ||
year = int(row["year"]) | ||
return BatchJob(job_id=f"job-{year}", connection=connection) | ||
|
||
df = manager._normalize_df(df) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this normalize_df be handled automatically in the manager? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we lack a good mechanism to initialize a job db correctly, we'll have to come up with something There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @soxofaan I introduced initialize_job_db to address this issue. |
||
job_db = CsvJobDatabase(output_file) | ||
job_db.persist(df) | ||
manager.start_job_thread( start_job=start_job,job_db=job_db) | ||
sleep(20) | ||
manager.stop_job_thread(10) | ||
#assert sleep_mock.call_count > 10 | ||
|
||
result = pd.read_csv(output_file) | ||
assert len(result) == 5 | ||
assert set(result.status) == {"finished"} | ||
assert set(result.backend_name) == {"foo", "bar"} | ||
|
||
# We expect that the job metadata was saved, so verify that it exists. | ||
# Checking for one of the jobs is enough. | ||
metadata_path = manager.get_job_metadata_path(job_id="job-2022") | ||
assert metadata_path.exists() | ||
|
||
def create_basic_mocked_manager(self, requests_mock, tmp_path): | ||
requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) | ||
requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) | ||
|
||
|
@@ -136,38 +200,11 @@ def mock_job_status(job_id, queued=1, running=2): | |
mock_job_status("job-2020", queued=3, running=4) | ||
mock_job_status("job-2021", queued=3, running=5) | ||
mock_job_status("job-2022", queued=5, running=6) | ||
|
||
root_dir = tmp_path / "job_mgr_root" | ||
manager = MultiBackendJobManager(root_dir=root_dir) | ||
|
||
manager.add_backend("foo", connection=openeo.connect("http://foo.test")) | ||
manager.add_backend("bar", connection=openeo.connect("http://bar.test")) | ||
|
||
df = pd.DataFrame( | ||
{ | ||
"year": [2018, 2019, 2020, 2021, 2022], | ||
# Use simple points in WKT format to test conversion to the geometry dtype | ||
"geometry": ["POINT (1 2)"] * 5, | ||
} | ||
) | ||
output_file = tmp_path / "jobs.csv" | ||
|
||
def start_job(row, connection, **kwargs): | ||
year = int(row["year"]) | ||
return BatchJob(job_id=f"job-{year}", connection=connection) | ||
|
||
manager.run_jobs(df=df, start_job=start_job, output_file=output_file) | ||
assert sleep_mock.call_count > 10 | ||
|
||
result = pd.read_csv(output_file) | ||
assert len(result) == 5 | ||
assert set(result.status) == {"finished"} | ||
assert set(result.backend_name) == {"foo", "bar"} | ||
|
||
# We expect that the job metadata was saved, so verify that it exists. | ||
# Checking for one of the jobs is enough. | ||
metadata_path = manager.get_job_metadata_path(job_id="job-2022") | ||
assert metadata_path.exists() | ||
return manager | ||
|
||
def test_normalize_df(self): | ||
df = pd.DataFrame( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using "thread" in naming and docs might be confusing and setting wrong expectations as asyncio is not about threading but coroutines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's now converted to use an actual 'Thread' object, so the confusion is gone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@soxofaan if this is fine now, we can merge and continue with the other PR's