Skip to content

Commit

Permalink
Bugfixes for multi-target rules
Browse files Browse the repository at this point in the history
  • Loading branch information
tblock79 committed Sep 26, 2024
1 parent 6d50c8d commit db0fd5b
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 20 deletions.
2 changes: 1 addition & 1 deletion common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class TaskDispatchStatus(BaseModel, Compat):

class TaskDispatch(BaseModel, Compat):
target_name: Union[str,List[str]]
status: Dict[str, TaskDispatchStatus]
status: Union[Dict[str, TaskDispatchStatus], EmptyDict] = cast(EmptyDict, {})
retries: Optional[int] = 0
next_retry_at: Optional[float] = 0
series_uid: Optional[str]
Expand Down
6 changes: 3 additions & 3 deletions dispatch/retry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import cast, Dict
from typing import cast, Dict, Union
from typing_extensions import Literal

from common.types import Task, TaskDispatch, TaskDispatchStatus
from common.types import Task, TaskDispatch, TaskDispatchStatus, EmptyDict
import json
import time
from pathlib import Path
Expand Down Expand Up @@ -31,7 +31,7 @@ def increase_retry(source_folder, retry_max, retry_delay) -> bool:
return True


def update_dispatch_status(source_folder: Path, status : Dict[str, TaskDispatchStatus]) -> bool:
def update_dispatch_status(source_folder: Path, status : Union[Dict[str, TaskDispatchStatus], EmptyDict]) -> bool:
target_json_path : Path = source_folder / mercure_names.TASKFILE
try:
with open(target_json_path, "r") as file:
Expand Down
21 changes: 13 additions & 8 deletions dispatch/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pathlib import Path
from shlex import split
from subprocess import PIPE, CalledProcessError, check_output
from typing import Dict, Tuple, cast
from typing import Dict, Tuple, cast, Union
from typing_extensions import Literal

# App-specific includes
Expand Down Expand Up @@ -125,6 +125,8 @@ def execute(
if (uid == "uid-missing"):
logger.warning(f"Missing information for folder {source_folder}", task_content.id)

# Ensure that the target_name is a list. Needed just for backwards compatibility
# (i.e., if the task.json file was created with an older mercure version)
if isinstance(dispatch_info.target_name, str):
dispatch_info.target_name = [dispatch_info.target_name]

Expand Down Expand Up @@ -181,10 +183,13 @@ def execute(
)
return

current_status : Dict[str, TaskDispatchStatus] = dispatch_info.status
current_status = dispatch_info.status
# Needed just for backwards compatibility (i.e., if the task.json file was created with an older mercure version)
if len(current_status) != len(dispatch_info.target_name):
current_status = {target_item: TaskDispatchStatus(state="waiting", time=get_now_str()) for target_item in dispatch_info.target_name}

for target_item in dispatch_info.target_name:

if current_status[target_item] and current_status[target_item].state != "complete":
if current_status[target_item] and current_status[target_item].state != "complete": # type: ignore

# Compose the command for dispatching the results
target = config.mercure.targets.get(target_item, None)
Expand All @@ -193,7 +198,7 @@ def execute(
f"Error sending {uid} to {target_item}: unable to get target information",
task_content.id,
)
current_status[target_item] = TaskDispatchStatus(state="error", time=get_now_str())
current_status[target_item] = TaskDispatchStatus(state="error", time=get_now_str()) # type: ignore
continue

try:
Expand All @@ -214,19 +219,19 @@ def execute(
target_item,
"Routing job complete",
)
current_status[target_item] = TaskDispatchStatus(state="complete", time=get_now_str())
current_status[target_item] = TaskDispatchStatus(state="complete", time=get_now_str()) # type: ignore

except Exception as e:
logger.error( # handle_error
f"Error sending uid {uid} in task {task_content.id} to {target_item}:\n {e}",
task_content.id,
target=target_item,
)
current_status[target_item] = TaskDispatchStatus(state="error", time=get_now_str())
current_status[target_item] = TaskDispatchStatus(state="error", time=get_now_str()) # type: ignore

dispatch_success = True
for item in current_status:
if current_status[item].state != "complete":
if current_status[item].state != "complete": # type: ignore
dispatch_success = False
break

Expand Down
5 changes: 4 additions & 1 deletion routing/generate_taskfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ def add_dispatching(

targets_used: List[str] = []
if isinstance(target,str):
targets_used = [target]
# Only insert string into list if it is not empty
if target:
targets_used = [target]
else:
targets_used = target

Expand Down Expand Up @@ -199,6 +201,7 @@ def add_dispatching(
target_status : Dict[str, TaskDispatchStatus] = {}
current_time=get_now_str()
for target_item in targets_used:
#logger.info(f"Adding target {target_item} to dispatching info")
# Check if all selected targets actually exist in the configuration (could have been deleted by now)
if not config.mercure.targets.get(target_item, {}):
logger.error(f"Target {target_item} does not exist for UID {uid}", task_id) # handle_error
Expand Down
29 changes: 23 additions & 6 deletions routing/route_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ def route_series(task_id: str, series_UID: str, files:typing.List[Path] = []) ->
push_series_serieslevel(task_id, triggered_rules, fileList, series_UID, tagsList)

# If more than one rule has triggered, the series files need to be removed
# TODO: This can probably be avoided since the files are now contained in a separate folder
# for each series, so that files will be removed automatically by the rmtree call
if len(triggered_rules) > 1:
remove_series(task_id, fileList, series_UID)

Expand Down Expand Up @@ -338,8 +340,16 @@ def push_serieslevel_routing(
rule_definition.get("action_trigger", "series") == mercure_options.SERIES
and rule_definition.get("action") == mercure_actions.ROUTE
):
target = rule_definition.get("target")
if target:
targets = []
if rule_definition.get("target"):
if isinstance(rule_definition.get("target"), str):
# If the target is a string, only add it if it is not empty
if rule_definition.get("target"):
targets.append(rule_definition.get("target"))
else:
targets = rule_definition.get("target")

for target in targets:
if not selected_targets.get(target):
selected_targets[target] = [current_rule]
else:
Expand Down Expand Up @@ -464,12 +474,13 @@ def push_serieslevel_outgoing(
# Determine if the files should be copied or moved. If only one rule triggered, files can
# safely be moved, otherwise files will be moved and removed in the end
move_operation = False
if len(triggered_rules) == 1:
if len(selected_targets) == 1:
move_operation = True

for target in selected_targets:
for i,target in enumerate(selected_targets):
if not target in config.mercure.targets:
logger.error(f"Invalid target selected {target}", task_id) # handle_error
# TODO: Better error handling!
continue

new_task_id = generate_task_id()
Expand Down Expand Up @@ -507,8 +518,14 @@ def push_serieslevel_outgoing(
monitor.send_task_event(monitor.task_event.DELEGATE, task_id, len(file_list), new_task_id, ", ".join(selected_targets[target]))

operation: Callable
is_operation_move = False
if move_operation:
operation = shutil.move
# If there are more targets for one rule, then move the files only for the last target
if i==len(selected_targets)-1:
operation = shutil.move
is_operation_move = True
else:
operation = shutil.copy
else:
operation = shutil.copy

Expand All @@ -523,7 +540,7 @@ def push_serieslevel_outgoing(
)
raise

if move_operation:
if is_operation_move:
monitor.send_task_event(monitor.task_event.MOVE, task_id, len(file_list), str(target_folder), "Moved files")
else:
monitor.send_task_event(monitor.task_event.COPY, task_id, len(file_list), str(target_folder), "Copied files")
Expand Down
2 changes: 1 addition & 1 deletion webinterface/templates/rules.html
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ <h1 class="title">DICOM Rules</h1>
{% if (rules[x]['action']=='process') or (rules[x]['action']=='both') %}
<tr>
<td>Processing:</td>
<td>{% if rules[x]['target'] is string %}
<td>{% if rules[x]['processing_module'] is string %}
{{ rules[x]['processing_module'] }}
{% else %}
{% for i in rules[x]['processing_module'] %} {{ i }} {% if not loop.last %}&#10140; {% endif %} {% endfor %}
Expand Down

0 comments on commit db0fd5b

Please sign in to comment.