Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Wiggins authored and Roy Wiggins committed Nov 22, 2024
2 parents 71e5255 + 9b303ec commit d6275dc
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 25 deletions.
8 changes: 8 additions & 0 deletions common/event_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,11 @@ class severity(Enum):
WARNING = 1
ERROR = 2
CRITICAL = 3


class FailStage(StringEnum):
"""Enum for the stages a task can fail at."""

DISPATCHING = auto()
PROCESSING = auto()
ROUTING = auto()
3 changes: 3 additions & 0 deletions common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from pydantic import BaseModel
import typing

from common.event_types import FailStage

# TODO: Add description for the individual classes


Expand Down Expand Up @@ -311,6 +313,7 @@ class TaskInfo(BaseModel, Compat):
mercure_appliance: str
mercure_server: str
device_serial_number: Optional[str] = None
fail_stage: Optional[FailStage] = None


class TaskDispatchStatus(BaseModel, Compat):
Expand Down
32 changes: 28 additions & 4 deletions dispatch/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

# Standard python includes
import json
import shutil
import subprocess
import time
Expand Down Expand Up @@ -33,6 +34,7 @@
)
import dispatch.target_types as target_types
from common.helper import get_now_str
from common.event_types import FailStage

# Create local logger instance
logger = config.get_logger()
Expand Down Expand Up @@ -136,7 +138,7 @@ def execute(
task_content.id,
target="",
)
_move_sent_directory(task_content.id, source_folder, error_folder)
_move_sent_directory(task_content.id, source_folder, error_folder, FailStage.DISPATCHING)
_trigger_notification(task_content, mercure_events.ERROR)
return

Expand All @@ -147,7 +149,7 @@ def execute(
task_content.id,
target=target_item,
)
_move_sent_directory(task_content.id, source_folder, error_folder)
_move_sent_directory(task_content.id, source_folder, error_folder, FailStage.DISPATCHING)
_trigger_notification(task_content, mercure_events.ERROR)
return

Expand Down Expand Up @@ -257,14 +259,14 @@ def execute(
else:
logger.info(f"Max retries reached, moving to {error_folder}")
monitor.send_task_event(task_event.SUSPEND, task_content.id, 0, ",".join(dispatch_info.target_name), "Max retries reached")
_move_sent_directory(task_content.id, source_folder, error_folder)
_move_sent_directory(task_content.id, source_folder, error_folder, FailStage.DISPATCHING)
monitor.send_task_event(task_event.MOVE, task_content.id, 0, str(error_folder), "Moved to error folder")
monitor.send_event(m_events.PROCESSING, severity.ERROR, f"Series suspended after reaching max retries")
_trigger_notification(task_content, mercure_events.ERROR)
logger.info(f"Dispatching folder {source_folder} not successful")


def _move_sent_directory(task_id, source_folder, destination_folder) -> None:
def _move_sent_directory(task_id, source_folder, destination_folder, fail_stage=None) -> None:
"""
This check is needed if there is already a folder with the same name
in the success folder. If so a new directory is create with a timestamp
Expand All @@ -275,10 +277,14 @@ def _move_sent_directory(task_id, source_folder, destination_folder) -> None:
target_folder = destination_folder / (source_folder.name + "_" + datetime.now().isoformat())
logger.debug(f"Moving {source_folder} to {target_folder}")
shutil.move(source_folder, target_folder, copy_function=shutil.copy2)
if fail_stage and not update_fail_stage(target_folder, fail_stage):
logger.error(f"Error updating fail stage for task {task_id}")
(Path(target_folder) / mercure_names.PROCESSING).unlink()
else:
logger.debug(f"Moving {source_folder} to {destination_folder / source_folder.name}")
shutil.move(source_folder, destination_folder / source_folder.name)
if fail_stage and not update_fail_stage(destination_folder / source_folder.name, fail_stage):
logger.error(f"Error updating fail stage for task {task_id}")
(destination_folder / source_folder.name / mercure_names.PROCESSING).unlink()
except:
logger.error(f"Error moving folder {source_folder} to {destination_folder}", task_id) # handle_error
Expand Down Expand Up @@ -309,3 +315,21 @@ def _trigger_notification(task: Task, event: mercure_events) -> None:
details=notification.get_task_custom_notification(task),
send_always=request_do_send,
)


def update_fail_stage(source_folder: Path, fail_stage : FailStage) -> bool:
in_string = "in" if fail_stage == FailStage.PROCESSING else ""
target_json_path : Path = source_folder / in_string / mercure_names.TASKFILE
try:
with open(target_json_path, "r") as file:
task: Task = Task(**json.load(file))

task_info = task.info
task_info.fail_stage = str(fail_stage) # type: ignore

with open(target_json_path, "w") as file:
json.dump(task.dict(), file)
except Exception as e:
return False

return True
8 changes: 6 additions & 2 deletions process/process_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from common.constants import (
mercure_events,
)
from common.event_types import FailStage
from dispatch.send import update_fail_stage


logger = config.get_logger()
Expand Down Expand Up @@ -583,7 +585,7 @@ def move_results(
lock.free()
if not processing_success:
logger.debug(f"Failing: {folder}")
move_out_folder(task_id, folder, Path(config.mercure.error_folder), move_all=True)
move_out_folder(task_id, folder, Path(config.mercure.error_folder), move_all=True, fail_stage=FailStage.PROCESSING)
else:
if needs_dispatching:
logger.debug(f"Dispatching: {folder}")
Expand All @@ -593,7 +595,7 @@ def move_results(
move_out_folder(task_id, folder, Path(config.mercure.success_folder))


def move_out_folder(task_id: str, source_folder: Path, destination_folder: Path, move_all=False) -> None:
def move_out_folder(task_id: str, source_folder: Path, destination_folder: Path, move_all=False, fail_stage=None) -> None:
# source_folder = Path(source_folder_str)
# destination_folder = Path(destination_folder_str)

Expand All @@ -609,6 +611,8 @@ def move_out_folder(task_id: str, source_folder: Path, destination_folder: Path,
try:
if move_all:
shutil.move(str(source_folder), target_folder)
if fail_stage and not update_fail_stage(target_folder, FailStage.PROCESSING):
logger.error( f"Error updating fail stage for task {task_id}")
else:
shutil.move(str(source_folder / "out"), target_folder)
lockfile = source_folder / mercure_names.LOCK
Expand Down
3 changes: 2 additions & 1 deletion tests/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@
"mercure_appliance": "master",
"mercure_server": socket.gethostname(),
"device_serial_number": None,
"sender_address": "0.0.0.0"
"sender_address": "0.0.0.0",
"fail_stage": None
}

def create_and_route(fs, mocked, task_id, config, uid="TESTFAKEUID") -> Tuple[List[str], str]:
Expand Down
36 changes: 34 additions & 2 deletions webinterface/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,18 @@ async def show_jobs_fail(request):
for entry in os.scandir(config.mercure.error_folder):
if entry.is_dir():
job_name: str = entry.name
timestamp: float = entry.stat().st_mtime
job_acc: str = ""
job_mrn: str = ""
job_scope: str = "Series"
job_failstage: str = "Unknown"

# keeping the manual way of getting the fail stage too for now
try:
job_failstage = get_fail_stage(Path(entry.path))
except Exception as e:
logger.exception(e)

task_file = Path(entry.path) / mercure_names.TASKFILE
if not task_file.exists():
task_file = Path(entry.path) / "in" / mercure_names.TASKFILE
Expand All @@ -301,6 +308,8 @@ async def show_jobs_fail(request):
job_scope = "Series"
else:
job_scope = "Study"
if (task.info.fail_stage):
job_failstage = str(task.info.fail_stage).capitalize()
except Exception as e:
logger.exception(e)
job_acc = "Error"
Expand All @@ -312,9 +321,10 @@ async def show_jobs_fail(request):
"MRN": job_mrn,
"Scope": job_scope,
"FailStage": job_failstage,
"CreationTime": timestamp,
}

return JSONResponse(job_list)
sorted_jobs = collections.OrderedDict(sorted(job_list.items(), key=lambda x: x[1]["CreationTime"], reverse=False)) # type: ignore
return JSONResponse(sorted_jobs)


@router.get("/status")
Expand Down Expand Up @@ -504,5 +514,27 @@ def restart_dispatch(taskfile_folder: Path, outgoing_folder: Path) -> dict:

return {"success": "task restarted"}

def get_fail_stage(taskfile_folder: Path) -> str:
if not taskfile_folder.exists():
return "Unknown"

dispatch_ready = (
not (taskfile_folder / mercure_names.LOCK).exists()
and not (taskfile_folder / mercure_names.ERROR).exists()
and not (taskfile_folder / mercure_names.PROCESSING).exists()
)

if not dispatch_ready or not (taskfile_folder / mercure_names.TASKFILE).exists():
return "Unknown"

taskfile_path = taskfile_folder / mercure_names.TASKFILE
with open(taskfile_path, "r") as json_file:
loaded_task = json.load(json_file)

action = loaded_task.get("info", {}).get("action", "")
if action and action not in (mercure_actions.BOTH, mercure_actions.ROUTE):
return "Unknown"

return "Dispatching"

queue_app = Starlette(routes=router)
25 changes: 10 additions & 15 deletions webinterface/templates/dashboards/query.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ <h1 class="title is-4">DICOM Query</h1>
</div>

<form class="box" id="form">
<div class="field is-horizontal has-addons">
<div class="field is-horizontal">
<div class="field-label is-normal">
<label class="label">Accession Number</label>
</div>
Expand All @@ -34,17 +34,12 @@ <h1 class="title is-4">DICOM Query</h1>
</div>
</div>
</div>
</div>
<div class="control file has-name is-right">
<label class="file-label">
<input id="file-input" class="file-input" type="file" name="resume" />
<span class="file-cta">
<span class="file-icon">
<i class="fas fa-upload"></i>
</span>
<span class="file-label"> Choose a file… </span>
</span>
</label>
<div class="control file has-name is-right">
<a class="button">
<input id="file-input" class="file-input" type="file" name="upload-file" style="cursor: pointer;" />
<span> <i class="fas fa-upload"></i>&nbsp;Upload list</span>
</a>
</div>
</div>
</div>

Expand Down Expand Up @@ -206,17 +201,17 @@ <h1 class="title is-4">DICOM Query</h1>

</main>
<script>

$('#accession').on('input', function (e) {
if (e.target.value.indexOf("\n") < 0) {
$(e.target).outerHeight(36).outerHeight(this.scrollHeight + 1);
}
$(e.target).outerHeight(36).outerHeight(e.target.scrollHeight + 1);
});

function clearErrors() {
$('#backend-error').addClass("is-hidden");
$('#backend-error').text('');
$(':input').removeClass("is-danger");
}

function handleForceRuleInput() {
if ($("#destination-input").val() == "") {
$("#force-rule-input").prop('disabled', false);
Expand Down
7 changes: 6 additions & 1 deletion webinterface/templates/queue.html
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,12 @@ <h5 class="title is-5 configtitle" style="margin-top: 60px;"><i
$('#jobs_fail').DataTable().on( 'select deselect', function () {
var selectedRows = $('#jobs_fail').DataTable().rows( { selected: true } ).count();
$('#jobs_fail').DataTable().button(2).enable( selectedRows > 0 );
$('#jobs_fail').DataTable().button(3).enable( selectedRows > 0 );
if (selectedRows == 1) {
let failStage = $('#jobs_fail').DataTable().rows( { selected: true } ).data()[0][3];
$('#jobs_fail').DataTable().button(3).enable( failStage == "Dispatching" );
} else {
$('#jobs_fail').DataTable().button(3).enable( false );
}
} );

$('#jobs_archive').DataTable( {
Expand Down

0 comments on commit d6275dc

Please sign in to comment.