From bf55bdba913fe5df9b9924847354a96e0cce01c8 Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Tue, 10 May 2022 03:45:05 -0700 Subject: [PATCH] add job ts tests --- src/biokbase/narrative/jobs/job.py | 30 +-- src/biokbase/narrative/jobs/jobcomm.py | 13 +- src/biokbase/narrative/jobs/util.py | 28 +++ .../narrative/tests/job_test_constants.py | 2 +- src/biokbase/narrative/tests/test_job.py | 2 +- src/biokbase/narrative/tests/test_job_util.py | 186 +++++++++++++++++- src/biokbase/narrative/tests/test_jobcomm.py | 106 ++++++++++ .../narrative/tests/test_jobmanager.py | 60 ++++++ 8 files changed, 391 insertions(+), 36 deletions(-) diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index 325c1702de..ea07e2ff38 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -10,6 +10,7 @@ import biokbase.narrative.clients as clients from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state from biokbase.narrative.exception_util import transform_job_exception +from biokbase.narrative.jobs.util import merge, merge_inplace from .specmanager import SpecManager @@ -82,33 +83,6 @@ STATE_ATTRS = list(set(JOB_ATTRS) - set(JOB_INPUT_ATTRS) - set(NARR_CELL_INFO_ATTRS)) -def merge(d0: dict, d1: dict): - d0 = copy.deepcopy(d0) - merge_inplace(d0, d1) - return d0 - - -def merge_inplace(d0: dict, d1: dict): - """ - Recursively merge nested dicts d1 into d0, - overwriting any values in d0 that are not nested dicts. - Mutates d0 - """ - for k, v1 in d1.items(): - if k in d0: - v0 = d0[k] - is_dict_0 = isinstance(v0, dict) - is_dict_1 = isinstance(v1, dict) - if is_dict_0 ^ is_dict_1: - raise ValueError(f"For key {k}: is_dict(v0) xor is_dict(v1)") - elif not is_dict_0 and not is_dict_1: - d0[k] = v1 - elif is_dict_0 and is_dict_1: - merge_inplace(v0, v1) - else: - d0[k] = v0 - - class Job: _job_logs = [] _acc_state = None # accumulates state @@ -561,7 +535,7 @@ def log(self, first_line=0, num_lines=None): return (num_available_lines, []) return ( num_available_lines, - self._job_logs[first_line : first_line + num_lines], + self._job_logs[first_line: first_line + num_lines], ) def _update_log(self): diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index ba58bc8ffb..4bdf35f0d3 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -355,11 +355,6 @@ def _get_job_states(self, job_id_list: list, ts: int = None) -> dict: :rtype: dict """ output_states = self._jm.get_job_states(job_id_list, ts) - - now = time.time_ns() - for output_state in output_states.values(): - output_state["last_checked"] = now - self.send_comm_message(MESSAGE_TYPE["STATUS"], output_states) return output_states @@ -520,6 +515,14 @@ def send_comm_message(self, msg_type: str, content: dict) -> None: Sends a ipykernel.Comm message to the KBaseJobs channel with the given msg_type and content. These just get encoded into the message itself. """ + # For STATUS responses, add a last_checked field + # to each output_state. Note: error states will have + # the last_checked field too + if msg_type == MESSAGE_TYPE["STATUS"]: + now = time.time_ns() + for output_state in content.values(): + output_state["last_checked"] = now + msg = {"msg_type": msg_type, "content": content} self._comm.send(msg) diff --git a/src/biokbase/narrative/jobs/util.py b/src/biokbase/narrative/jobs/util.py index 8b9c07e77a..27aa4b525a 100644 --- a/src/biokbase/narrative/jobs/util.py +++ b/src/biokbase/narrative/jobs/util.py @@ -1,3 +1,4 @@ +import copy import json import os @@ -58,3 +59,30 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): ) return (config["params"], config["message_types"]) + + +def merge(d0: dict, d1: dict): + d0 = copy.deepcopy(d0) + merge_inplace(d0, d1) + return d0 + + +def merge_inplace(d0: dict, d1: dict): + """ + Recursively merge nested dicts d1 into d0, + overwriting any values in d0 that are not nested dicts. + Mutates d0 + """ + for k, v1 in d1.items(): + if k in d0: + v0 = d0[k] + is_dict_0 = isinstance(v0, dict) + is_dict_1 = isinstance(v1, dict) + if is_dict_0 ^ is_dict_1: + raise ValueError(f"For key {k}: is_dict(v0) xor is_dict(v1)") + elif not is_dict_0 and not is_dict_1: + d0[k] = v1 + elif is_dict_0 and is_dict_1: + merge_inplace(v0, v1) + else: + d0[k] = v1 diff --git a/src/biokbase/narrative/tests/job_test_constants.py b/src/biokbase/narrative/tests/job_test_constants.py index 597c43eec7..112c27bcbc 100644 --- a/src/biokbase/narrative/tests/job_test_constants.py +++ b/src/biokbase/narrative/tests/job_test_constants.py @@ -41,7 +41,7 @@ def get_test_jobs(job_ids): CLIENTS = "biokbase.narrative.clients.get" -TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns" +JC_TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns" TEST_EPOCH_NS = 42 # arbitrary epoch ns MAX_LOG_LINES = 10 diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index b2ae1d68f2..a6a254c1f1 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -374,7 +374,7 @@ def mock_state(self, state=None): "updated": 0, } - with mock.patch.object(Job, "state", mock_state): + with mock.patch.object(Job, "refresh_state", mock_state): state = job.output_state() self.assertEqual(expected, state) diff --git a/src/biokbase/narrative/tests/test_job_util.py b/src/biokbase/narrative/tests/test_job_util.py index 656d86f160..8ba414b704 100644 --- a/src/biokbase/narrative/tests/test_job_util.py +++ b/src/biokbase/narrative/tests/test_job_util.py @@ -1,6 +1,8 @@ +import copy +import re import unittest -from biokbase.narrative.jobs.util import load_job_constants +from biokbase.narrative.jobs.util import load_job_constants, merge, merge_inplace class JobUtilTestCase(unittest.TestCase): @@ -57,5 +59,187 @@ def test_load_job_constants__valid(self): self.assertIn(item, message_types) +class MergeTest(unittest.TestCase): + def _check_merge_inplace(self, d0: dict, d1: dict, exp_merge: dict): + d1_copy = copy.deepcopy(d1) + merge_inplace(d0, d1) + self.assertEqual( + d0, + exp_merge + ) + self.assertEqual( + d1, + d1_copy + ) + + def test_merge_inplace__empty(self): + d0 = {} + d1 = {} + self._check_merge_inplace( + d0, + d1, + {} + ) + + def test_merge_inplace__d0_empty(self): + # flat + d0 = {} + d1 = {"level00": "l00"} + self._check_merge_inplace( + d0, + d1, + {"level00": "l00"} + ) + + # nested + d0 = {} + d1 = { + "level00": "l00", + "level01": { + "level10": "l10" + } + } + self._check_merge_inplace( + d0, + d1, + { + "level00": "l00", + "level01": { + "level10": "l10" + } + } + ) + + def test_merge_inplace__d1_empty(self): + # flat + d0 = {"level00": "l00"} + d1 = {} + self._check_merge_inplace( + d0, + d1, + {"level00": "l00"} + ) + + # nested + d0 = { + "level00": "l00", + "level01": { + "level10": "l10" + } + } + d1 = {} + self._check_merge_inplace( + d0, + d1, + { + "level00": "l00", + "level01": { + "level10": "l10" + } + } + ) + + def test_merge_inplace__flat(self): + d0 = { + "level00": "l00", + "level01": "l01" + } + d1 = { + "level01": "l01_", + "level02": "l02" + } + self._check_merge_inplace( + d0, + d1, + { + "level00": "l00", + "level01": "l01_", + "level02": "l02" + } + ) + + def test_merge_inplace__nested(self): + d0 = { + "level00": { + "level10": { + "level20": "l20", + "level21": "l21" + } + }, + "level01": "l01" + } + d1 = { + "level00": { + "level10": { + "level22": "l22" + } + }, + "level01": "l01_" + } + self._check_merge_inplace( + d0, + d1, + { + "level00": { + "level10": { + "level20": "l20", + "level21": "l21", + "level22": "l22" + } + }, + "level01": "l01_" + } + ) + + def test_merge_inplace__xor_dicts(self): + d0 = { + "level00": {} + } + d1 = { + "level00": "l00", + "level01": "l01" + } + with self.assertRaisesRegex( + ValueError, + re.escape("For key level00: is_dict(v0) xor is_dict(v1)") + ): + merge_inplace(d0, d1) + + def test_merge(self): + d0 = { + "level00": "l00", + "level01": { + "level10": { + "level20": "l20" + } + }, + "level02": "l02" + } + d1 = { + "level01": { + "level10": { + "level20": "l20_" + } + } + } + d0_copy = copy.deepcopy(d0) + d1_copy = copy.deepcopy(d1) + d0_merge = merge(d0, d1) + self.assertEqual(d0, d0_copy) + self.assertEqual(d1, d1_copy) + self.assertEqual( + d0_merge, + { + "level00": "l00", + "level01": { + "level10": { + "level20": "l20_" + } + }, + "level02": "l02" + } + ) + + if __name__ == "__main__": unittest.main() diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index c526e1bebf..517d6ccf25 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -2,6 +2,7 @@ import itertools import os import re +import time import unittest from unittest import mock @@ -56,7 +57,10 @@ JOB_TERMINATED, MAX_LOG_LINES, REFRESH_STATE, + TEST_EPOCH_NS, + JC_TIME_NS, generate_error, + get_test_jobs, ) from .narrative_mock.mockclients import ( @@ -98,6 +102,14 @@ LOG_LINES = [{"is_error": 0, "line": f"This is line {i}"} for i in range(MAX_LOG_LINES)] +def ts_are_close(t0: int, t1: int) -> bool: + """ + t0 and t1 are epochs in nanoseconds. + Check that they are within 1s of each other + """ + return abs(t1 - t0) * 1e-9 <= 1 + + def make_comm_msg( msg_type: str, job_id_like, as_job_request: bool, content: dict = None ): @@ -485,7 +497,22 @@ def test_start_job_status_loop__no_jobs_stop_loop(self): # Lookup all job states # --------------------- + def _check_pop_last_checked(self, output_states, last_checked=TEST_EPOCH_NS): + """ + For STATUS responses, each output_state will have an extra field `last_checked` + that is variable and is not in the test data. Check that here and delete before + other checkd + """ + for output_state in output_states.values(): + self.assertIn("last_checked", output_state) + self.assertTrue( + last_checked == output_state["last_checked"] + or ts_are_close(last_checked, output_state["last_checked"]) + ) + del output_state["last_checked"] + @mock.patch(CLIENTS, get_mock_client) + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def check_job_output_states( self, output_states=None, @@ -494,6 +521,7 @@ def check_job_output_states( response_type=STATUS, ok_states=None, error_states=None, + last_checked=TEST_EPOCH_NS, ): """ Handle any request that returns a dictionary of job state objects; this @@ -506,6 +534,7 @@ def check_job_output_states( :param params: params for the comm message (opt) :param ok_states: list of job IDs expected to be in the output :param error_states: list of job IDs expected to return a not found error + :param last_checked: ts in ns """ if not params: params = {} @@ -527,6 +556,11 @@ def check_job_output_states( msg, ) + # for STATUS responses, there will be a field `last_checked` + # that is variable and not in the test data. check that here and remove + if response_type == MESSAGE_TYPE["STATUS"]: + self._check_pop_last_checked(output_states, last_checked) + for job_id, state in output_states.items(): self.assertEqual(ALL_RESPONSE_DATA[STATUS][job_id], state) if job_id in ok_states: @@ -544,12 +578,19 @@ def test_get_all_job_states__ok(self): # ----------------------- # Lookup single job state # ----------------------- + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_state__1_ok(self): output_states = self.jc.get_job_state(JOB_COMPLETED) self.check_job_output_states( output_states=output_states, ok_states=[JOB_COMPLETED] ) + def test_get_job_state__live_ts(self): + output_states = self.jc.get_job_state(JOB_COMPLETED) + self.check_job_output_states( + output_states=output_states, ok_states=[JOB_COMPLETED], last_checked=time.time_ns() + ) + def test_get_job_state__no_job(self): with self.assertRaisesRegex( JobRequestException, re.escape(f"{JOBS_MISSING_ERR}: {[None]}") @@ -612,6 +653,7 @@ def test_get_job_states__batch_id__not_batch(self): self.check_batch_id__not_batch_test(STATUS) @mock.patch(CLIENTS, get_mock_client) + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_states__job_id_list__ee2_error(self): exc = Exception("Test exception") exc_message = str(exc) @@ -626,6 +668,8 @@ def mock_check_jobs(params): self.jc._handle_comm_message(req_dict) msg = self.jc._comm.last_message + self._check_pop_last_checked(msg["content"], TEST_EPOCH_NS) + expected = {job_id: copy.deepcopy(ALL_RESPONSE_DATA[STATUS][job_id]) for job_id in ALL_JOBS} for job_id in ACTIVE_JOBS: # add in the ee2_error message @@ -639,6 +683,65 @@ def mock_check_jobs(params): msg, ) + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__last_updated(self): + """ + Copied from test_jobmanager.py + But also tests the last_checked field + """ + # what FE will say was the last time the jobs were checked + ts = time.time_ns() + + # mix of terminal and not terminal + not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] + # not terminal + updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] + + # error ids + not_found_ids = [JOB_NOT_FOUND] + + job_ids = not_updated_ids + updated_ids + active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) + + # all job IDs partitioned as + not_found_ids + terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) # noqa: F841 + not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) # noqa: F841 + updated_active_ids = list(set(updated_ids) & set(active_ids)) + + def mock_check_jobs(self_, params): + """Mutate only chosen job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(active_ids, lookup_ids) # sanity check + + job_states_ret = get_test_jobs(lookup_ids) + for job_id, job_state in job_states_ret.items(): + # if job is chosen to be updated, mutate it + if job_id in updated_active_ids: + job_state["updated"] += 1 + return job_states_ret + + rq = make_comm_msg(STATUS, job_ids + not_found_ids, False, {"ts": ts}) + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + output_states = self.jc._handle_comm_message(rq) + + expected = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) + for job_id in updated_active_ids + } + for job_state in expected.values(): + job_state["jobState"]["updated"] += 1 + expected[JOB_NOT_FOUND] = { + "job_id": JOB_NOT_FOUND, + "error": f"Cannot find job with ID {JOB_NOT_FOUND}" + } + + self._check_pop_last_checked(output_states, ts) + self.assertEqual( + expected, + output_states + ) + # ----------------------- # get cell job states # ----------------------- @@ -840,12 +943,15 @@ def test_cancel_jobs__job_id_list__all_bad_jobs(self): ) @mock.patch(CLIENTS, get_mock_client) + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def test_cancel_jobs__job_id_list__failure(self): # the mock client will throw an error with BATCH_RETRY_RUNNING job_id_list = [JOB_RUNNING, BATCH_RETRY_RUNNING] req_dict = make_comm_msg(CANCEL, job_id_list, False) output = self.jc._handle_comm_message(req_dict) + self._check_pop_last_checked(output) + expected = { JOB_RUNNING: ALL_RESPONSE_DATA[STATUS][JOB_RUNNING], BATCH_RETRY_RUNNING: { diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index 82550c603a..12893ef4fc 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -2,6 +2,7 @@ import itertools import os import re +import time import unittest from datetime import datetime from unittest import mock @@ -36,6 +37,7 @@ BATCH_CHILDREN, BATCH_ERROR_RETRIED, BATCH_PARENT, + BATCH_RETRY_RUNNING, BATCH_TERMINATED, BATCH_TERMINATED_RETRIED, CLIENTS, @@ -50,6 +52,7 @@ TEST_JOBS, generate_error, get_test_job, + get_test_jobs, ) from .narrative_mock.mockclients import ( @@ -707,6 +710,63 @@ def test_get_job_states__empty(self): ): self.jm.get_job_states([]) + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__last_updated(self): + """ + Test that only updated jobs return an actual state + and that the rest of the jobs are removed + """ + # what FE will say was the last time the jobs were checked + ts = time.time_ns() + + # mix of terminal and not terminal + not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] + # not terminal + updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] + + # error ids + not_found_ids = [JOB_NOT_FOUND] + + job_ids = not_updated_ids + updated_ids + active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) + + # all job IDs partitioned as + not_found_ids + terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) # noqa: F841 + not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) # noqa: F841 + updated_active_ids = list(set(updated_ids) & set(active_ids)) + + def mock_check_jobs(self_, params): + """Mutate only chosen job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(active_ids, lookup_ids) # sanity check + + job_states_ret = get_test_jobs(lookup_ids) + for job_id, job_state in job_states_ret.items(): + # if job is chosen to be updated, mutate it + if job_id in updated_active_ids: + job_state["updated"] += 1 + return job_states_ret + + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + output_states = self.jm.get_job_states(job_ids + not_found_ids, ts=ts) + + expected = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) + for job_id in updated_active_ids + } + for job_state in expected.values(): + job_state["jobState"]["updated"] += 1 + expected[JOB_NOT_FOUND] = { + "job_id": JOB_NOT_FOUND, + "error": f"Cannot find job with ID {JOB_NOT_FOUND}" + } + + self.assertEqual( + expected, + output_states + ) + def test_update_batch_job__dne(self): with self.assertRaisesRegex( JobRequestException, f"{JOB_NOT_REG_ERR}: {JOB_NOT_FOUND}"