Skip to content

Commit

Permalink
add job ts tests
Browse files Browse the repository at this point in the history
  • Loading branch information
n1mus committed May 10, 2022
1 parent 762af03 commit bf55bdb
Show file tree
Hide file tree
Showing 8 changed files with 391 additions and 36 deletions.
30 changes: 2 additions & 28 deletions src/biokbase/narrative/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import biokbase.narrative.clients as clients
from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state
from biokbase.narrative.exception_util import transform_job_exception
from biokbase.narrative.jobs.util import merge, merge_inplace

from .specmanager import SpecManager

Expand Down Expand Up @@ -82,33 +83,6 @@
STATE_ATTRS = list(set(JOB_ATTRS) - set(JOB_INPUT_ATTRS) - set(NARR_CELL_INFO_ATTRS))


def merge(d0: dict, d1: dict):
d0 = copy.deepcopy(d0)
merge_inplace(d0, d1)
return d0


def merge_inplace(d0: dict, d1: dict):
"""
Recursively merge nested dicts d1 into d0,
overwriting any values in d0 that are not nested dicts.
Mutates d0
"""
for k, v1 in d1.items():
if k in d0:
v0 = d0[k]
is_dict_0 = isinstance(v0, dict)
is_dict_1 = isinstance(v1, dict)
if is_dict_0 ^ is_dict_1:
raise ValueError(f"For key {k}: is_dict(v0) xor is_dict(v1)")
elif not is_dict_0 and not is_dict_1:
d0[k] = v1
elif is_dict_0 and is_dict_1:
merge_inplace(v0, v1)
else:
d0[k] = v0


class Job:
_job_logs = []
_acc_state = None # accumulates state
Expand Down Expand Up @@ -561,7 +535,7 @@ def log(self, first_line=0, num_lines=None):
return (num_available_lines, [])
return (
num_available_lines,
self._job_logs[first_line : first_line + num_lines],
self._job_logs[first_line: first_line + num_lines],
)

def _update_log(self):
Expand Down
13 changes: 8 additions & 5 deletions src/biokbase/narrative/jobs/jobcomm.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,6 @@ def _get_job_states(self, job_id_list: list, ts: int = None) -> dict:
:rtype: dict
"""
output_states = self._jm.get_job_states(job_id_list, ts)

now = time.time_ns()
for output_state in output_states.values():
output_state["last_checked"] = now

self.send_comm_message(MESSAGE_TYPE["STATUS"], output_states)
return output_states

Expand Down Expand Up @@ -520,6 +515,14 @@ def send_comm_message(self, msg_type: str, content: dict) -> None:
Sends a ipykernel.Comm message to the KBaseJobs channel with the given msg_type
and content. These just get encoded into the message itself.
"""
# For STATUS responses, add a last_checked field
# to each output_state. Note: error states will have
# the last_checked field too
if msg_type == MESSAGE_TYPE["STATUS"]:
now = time.time_ns()
for output_state in content.values():
output_state["last_checked"] = now

msg = {"msg_type": msg_type, "content": content}
self._comm.send(msg)

Expand Down
28 changes: 28 additions & 0 deletions src/biokbase/narrative/jobs/util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import json
import os

Expand Down Expand Up @@ -58,3 +59,30 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS):
)

return (config["params"], config["message_types"])


def merge(d0: dict, d1: dict):
d0 = copy.deepcopy(d0)
merge_inplace(d0, d1)
return d0


def merge_inplace(d0: dict, d1: dict):
"""
Recursively merge nested dicts d1 into d0,
overwriting any values in d0 that are not nested dicts.
Mutates d0
"""
for k, v1 in d1.items():
if k in d0:
v0 = d0[k]
is_dict_0 = isinstance(v0, dict)
is_dict_1 = isinstance(v1, dict)
if is_dict_0 ^ is_dict_1:
raise ValueError(f"For key {k}: is_dict(v0) xor is_dict(v1)")
elif not is_dict_0 and not is_dict_1:
d0[k] = v1
elif is_dict_0 and is_dict_1:
merge_inplace(v0, v1)
else:
d0[k] = v1
2 changes: 1 addition & 1 deletion src/biokbase/narrative/tests/job_test_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get_test_jobs(job_ids):


CLIENTS = "biokbase.narrative.clients.get"
TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns"
JC_TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns"

TEST_EPOCH_NS = 42 # arbitrary epoch ns
MAX_LOG_LINES = 10
Expand Down
2 changes: 1 addition & 1 deletion src/biokbase/narrative/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def mock_state(self, state=None):
"updated": 0,
}

with mock.patch.object(Job, "state", mock_state):
with mock.patch.object(Job, "refresh_state", mock_state):
state = job.output_state()
self.assertEqual(expected, state)

Expand Down
186 changes: 185 additions & 1 deletion src/biokbase/narrative/tests/test_job_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import copy
import re
import unittest

from biokbase.narrative.jobs.util import load_job_constants
from biokbase.narrative.jobs.util import load_job_constants, merge, merge_inplace


class JobUtilTestCase(unittest.TestCase):
Expand Down Expand Up @@ -57,5 +59,187 @@ def test_load_job_constants__valid(self):
self.assertIn(item, message_types)


class MergeTest(unittest.TestCase):
def _check_merge_inplace(self, d0: dict, d1: dict, exp_merge: dict):
d1_copy = copy.deepcopy(d1)
merge_inplace(d0, d1)
self.assertEqual(
d0,
exp_merge
)
self.assertEqual(
d1,
d1_copy
)

def test_merge_inplace__empty(self):
d0 = {}
d1 = {}
self._check_merge_inplace(
d0,
d1,
{}
)

def test_merge_inplace__d0_empty(self):
# flat
d0 = {}
d1 = {"level00": "l00"}
self._check_merge_inplace(
d0,
d1,
{"level00": "l00"}
)

# nested
d0 = {}
d1 = {
"level00": "l00",
"level01": {
"level10": "l10"
}
}
self._check_merge_inplace(
d0,
d1,
{
"level00": "l00",
"level01": {
"level10": "l10"
}
}
)

def test_merge_inplace__d1_empty(self):
# flat
d0 = {"level00": "l00"}
d1 = {}
self._check_merge_inplace(
d0,
d1,
{"level00": "l00"}
)

# nested
d0 = {
"level00": "l00",
"level01": {
"level10": "l10"
}
}
d1 = {}
self._check_merge_inplace(
d0,
d1,
{
"level00": "l00",
"level01": {
"level10": "l10"
}
}
)

def test_merge_inplace__flat(self):
d0 = {
"level00": "l00",
"level01": "l01"
}
d1 = {
"level01": "l01_",
"level02": "l02"
}
self._check_merge_inplace(
d0,
d1,
{
"level00": "l00",
"level01": "l01_",
"level02": "l02"
}
)

def test_merge_inplace__nested(self):
d0 = {
"level00": {
"level10": {
"level20": "l20",
"level21": "l21"
}
},
"level01": "l01"
}
d1 = {
"level00": {
"level10": {
"level22": "l22"
}
},
"level01": "l01_"
}
self._check_merge_inplace(
d0,
d1,
{
"level00": {
"level10": {
"level20": "l20",
"level21": "l21",
"level22": "l22"
}
},
"level01": "l01_"
}
)

def test_merge_inplace__xor_dicts(self):
d0 = {
"level00": {}
}
d1 = {
"level00": "l00",
"level01": "l01"
}
with self.assertRaisesRegex(
ValueError,
re.escape("For key level00: is_dict(v0) xor is_dict(v1)")
):
merge_inplace(d0, d1)

def test_merge(self):
d0 = {
"level00": "l00",
"level01": {
"level10": {
"level20": "l20"
}
},
"level02": "l02"
}
d1 = {
"level01": {
"level10": {
"level20": "l20_"
}
}
}
d0_copy = copy.deepcopy(d0)
d1_copy = copy.deepcopy(d1)
d0_merge = merge(d0, d1)
self.assertEqual(d0, d0_copy)
self.assertEqual(d1, d1_copy)
self.assertEqual(
d0_merge,
{
"level00": "l00",
"level01": {
"level10": {
"level20": "l20_"
}
},
"level02": "l02"
}
)


if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit bf55bdb

Please sign in to comment.