From a1b2dd4b08752547b22dd3f0a85723f9e59e349c Mon Sep 17 00:00:00 2001 From: Roy Wiggins Date: Fri, 31 May 2024 15:58:36 +0000 Subject: [PATCH 01/42] initial work on dicom query/pull interface --- common/types.py | 12 +- webgui.py | 19 +- webinterface/common.py | 11 +- webinterface/dashboards.py | 59 ----- webinterface/dashboards/__init__.py | 7 + webinterface/dashboards/common.py | 7 + webinterface/dashboards/query.py | 77 ++++++ webinterface/dashboards/simple.py | 32 +++ webinterface/modules.py | 6 +- webinterface/query.py | 236 ++++++++++++++++++ webinterface/queue.py | 2 - webinterface/rules.py | 4 - webinterface/targets.py | 2 - .../templates/dashboards/dashboards.html | 1 + webinterface/templates/dashboards/query.html | 117 +++++++++ webinterface/users.py | 5 +- 16 files changed, 508 insertions(+), 89 deletions(-) delete mode 100644 webinterface/dashboards.py create mode 100644 webinterface/dashboards/__init__.py create mode 100644 webinterface/dashboards/common.py create mode 100644 webinterface/dashboards/query.py create mode 100644 webinterface/dashboards/simple.py create mode 100644 webinterface/query.py create mode 100644 webinterface/templates/dashboards/query.html diff --git a/common/types.py b/common/types.py index 33e89511..9558db6c 100755 --- a/common/types.py +++ b/common/types.py @@ -178,6 +178,16 @@ class DicomReceiverConfig(BaseModel): additional_tags: Dict[str,str] = {} +class DicomNode(BaseModel): + name: str + ip: str + port: int + aet_target: str + aet_source: Optional[str] = "" + +class DicomRetrieveConfig(BaseModel): + dicom_nodes: List[DicomNode] = [] + class Config(BaseModel, Compat): appliance_name: str port: int @@ -220,7 +230,7 @@ class Config(BaseModel, Compat): email_notification_from: str = "mercure@mercure.mercure" support_root_modules: Optional[bool] = False webhook_certificate_location: Optional[str] = None - + dicom_retrieve: DicomRetrieveConfig = DicomRetrieveConfig() class TaskInfo(BaseModel, Compat): action: Literal["route", "both", "process", "discard", "notification"] diff --git a/webgui.py b/webgui.py index 911df1de..41ff981e 100755 --- a/webgui.py +++ b/webgui.py @@ -295,7 +295,7 @@ async def show_log(request) -> Response: template = "logs.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, + "page": "logs", "service_logs": service_logs, "log_id": requested_service, @@ -307,7 +307,6 @@ async def show_log(request) -> Response: "end_time_available": runtime == "systemd", "start_time_available": runtime in ("docker", "systemd"), } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) @@ -330,14 +329,13 @@ async def configuration(request) -> Response: runtime = helper.get_runner() context = { "request": request, - "mercure_version": mercure_defs.VERSION, + "page": "configuration", "config": config.mercure, "os_string": os_string, "config_edited": config_edited, "runtime": runtime, } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) @@ -363,11 +361,10 @@ async def configuration_edit(request) -> Response: template = "configuration_edit.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, + "page": "configuration", "config_content": config_content, } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) @@ -411,7 +408,7 @@ async def login(request) -> Response: template = "login.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, + "appliance_name": config.mercure.get("appliance_name", "master"), } return templates.TemplateResponse(template, context) @@ -599,7 +596,7 @@ async def login_post(request) -> Response: context = { "request": request, "invalid_password": 1, - "mercure_version": mercure_defs.VERSION, + "appliance_name": config.mercure.get("appliance_name", "mercure Router"), } return templates.TemplateResponse(template, context) @@ -627,14 +624,13 @@ async def settings_edit(request) -> Response: template = "users_edit.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, + "page": "settings", "edituser": own_name, "edituser_info": users.users_list[own_name], "own_settings": "True", "change_password": users.users_list[own_name].get("change_password", "False"), } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) @@ -710,7 +706,7 @@ async def homepage(request) -> Response: template = "index.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, + "page": "homepage", "used_space": used_space, "free_space": free_space, @@ -718,7 +714,6 @@ async def homepage(request) -> Response: "service_status": service_status, "runtime": runtime, } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) diff --git a/webinterface/common.py b/webinterface/common.py index a870dd35..60e65adb 100755 --- a/webinterface/common.py +++ b/webinterface/common.py @@ -7,14 +7,18 @@ # Standard python includes from typing import Optional, Tuple import asyncio +from redis import Redis +from rq import Queue, Connection # Starlette-related includes from starlette.templating import Jinja2Templates +from common.constants import mercure_defs -templates = Jinja2Templates(directory="webinterface/templates") +worker_queue = Queue(connection=Redis()) + def get_user_information(request) -> dict: """Returns dictionary of values that should always be passed to the templates when the user is logged in.""" return { @@ -23,6 +27,11 @@ def get_user_information(request) -> dict: "is_admin": request.user.is_admin if request.user.is_authenticated else False, } +def get_mercure_version(request) -> dict: + return { "mercure_version": mercure_defs.VERSION } + +templates = Jinja2Templates(directory="webinterface/templates", context_processors=[get_user_information, get_mercure_version]) + async def async_run(cmd, **params) -> Tuple[Optional[int], bytes, bytes]: """Executes the given command in a way compatible with ayncio.""" diff --git a/webinterface/dashboards.py b/webinterface/dashboards.py deleted file mode 100644 index d441f192..00000000 --- a/webinterface/dashboards.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -test.py -======== -Test page for querying the bookkeeper database from the webgui. -""" - -# Standard python includes -import daiquiri - -# Starlette-related includes -from starlette.applications import Starlette -from starlette.authentication import requires - -# App-specific includes -from common.constants import mercure_defs -from webinterface.common import get_user_information -from webinterface.common import templates -import common.config as config -from starlette.responses import RedirectResponse -from decoRouter import Router as decoRouter -router = decoRouter() - -logger = config.get_logger() - -################################################################################### -## Test endpoints -################################################################################### - -@router.get("/") -async def index(request): - return RedirectResponse(url="tests") - - -@router.get("/tasks") -@requires("authenticated", redirect="login") -async def tasks(request): - template = "dashboards/tasks.html" - context = { - "request": request, - "mercure_version": mercure_defs.VERSION, - "page": "tasks", - } - context.update(get_user_information(request)) - return templates.TemplateResponse(template, context) - - -@router.get("/tests") -@requires(["authenticated", "admin"], redirect="login") -async def tests(request): - template = "dashboards/tests.html" - context = { - "request": request, - "mercure_version": mercure_defs.VERSION, - "page": "tests", - } - context.update(get_user_information(request)) - return templates.TemplateResponse(template, context) - -dashboards_app = Starlette(routes=router) \ No newline at end of file diff --git a/webinterface/dashboards/__init__.py b/webinterface/dashboards/__init__.py new file mode 100644 index 00000000..7fb55b79 --- /dev/null +++ b/webinterface/dashboards/__init__.py @@ -0,0 +1,7 @@ +from .common import router +from . import query, simple +from starlette.applications import Starlette + + + +dashboards_app = Starlette(routes=router) \ No newline at end of file diff --git a/webinterface/dashboards/common.py b/webinterface/dashboards/common.py new file mode 100644 index 00000000..f4eae329 --- /dev/null +++ b/webinterface/dashboards/common.py @@ -0,0 +1,7 @@ +from decoRouter import Router as decoRouter +from starlette.responses import RedirectResponse + +router = decoRouter() +@router.get("/") +async def index(request): + return RedirectResponse(url="tests") \ No newline at end of file diff --git a/webinterface/dashboards/query.py b/webinterface/dashboards/query.py new file mode 100644 index 00000000..e27f0740 --- /dev/null +++ b/webinterface/dashboards/query.py @@ -0,0 +1,77 @@ + +from webinterface.query import SimpleDicomClient +# Standard python includes +from datetime import datetime + +# Starlette-related includes +from starlette.authentication import requires + +# App-specific includes +from common.constants import mercure_defs +from webinterface.common import templates +import common.config as config +from starlette.responses import PlainTextResponse, JSONResponse +from webinterface.common import worker_queue +from rq import get_current_job + +from .common import router +logger = config.get_logger() + +def test_job(*,accession, node): + config.read_config() + c = SimpleDicomClient(node.ip, node.port, node.aet_target, config.mercure.incoming_folder) + for identifier in c.getscu(accession): + job = get_current_job() + job.meta['failed'] = identifier.NumberOfFailedSuboperations + job.meta['remaining'] = identifier.NumberOfRemainingSuboperations + job.meta['completed'] = identifier.NumberOfCompletedSuboperations + job.save_meta() + return "Complete" + +@router.post("/query") +@requires(["authenticated", "admin"], redirect="login") +async def query_post(request): + form = await request.form() + + for n in config.mercure.dicom_retrieve.dicom_nodes: + if n.name == form.get("dicom_node"): + node = n + break + + worker_queue.enqueue_call(test_job, kwargs=dict(accession=form.get("accession"), node=node), timeout='10m', result_ttl=-1) + return PlainTextResponse() + +@router.get("/query/jobs") +@requires(["authenticated", "admin"], redirect="login") +async def query_jobs(request): + registries = [ + worker_queue.started_job_registry, # Returns StartedJobRegistry + worker_queue.deferred_job_registry, # Returns DeferredJobRegistry + worker_queue.finished_job_registry, # Returns FinishedJobRegistry + worker_queue.failed_job_registry, # Returns FailedJobRegistry + worker_queue.scheduled_job_registry, # Returns ScheduledJobRegistry + ] + job_info = [] + for r in registries: + for j_id in r.get_job_ids(): + job = worker_queue.fetch_job(j_id) + job_info.append(dict(id=j_id, + status=job.get_status(), + parameters=dict(accession=job.kwargs.get('accession','')), + enqueued_at=1000*datetime.timestamp(job.enqueued_at), + result=job.result, + meta=job.meta)) + return JSONResponse(dict(data=job_info)) + # return PlainTextResponse(",".join([str(j) for j in all_jobs])) + +@router.get("/query") +@requires(["authenticated", "admin"], redirect="login") +async def query(request): + template = "dashboards/query.html" + context = { + "request": request, + + "dicom_nodes": config.mercure.dicom_retrieve.dicom_nodes, + "page": "query", + } + return templates.TemplateResponse(template, context) \ No newline at end of file diff --git a/webinterface/dashboards/simple.py b/webinterface/dashboards/simple.py new file mode 100644 index 00000000..c1a2ad47 --- /dev/null +++ b/webinterface/dashboards/simple.py @@ -0,0 +1,32 @@ +from .common import router +from starlette.authentication import requires + +# App-specific includes +from common.constants import mercure_defs +from webinterface.common import get_user_information, templates +import common.config as config +logger = config.get_logger() +from .common import router + +@router.get("/tasks") +@requires("authenticated", redirect="login") +async def tasks(request): + template = "dashboards/tasks.html" + context = { + "request": request, + + "page": "tasks", + } + return templates.TemplateResponse(template, context) + + +@router.get("/tests") +@requires(["authenticated", "admin"], redirect="login") +async def tests(request): + template = "dashboards/tests.html" + context = { + "request": request, + + "page": "tests", + } + return templates.TemplateResponse(template, context) \ No newline at end of file diff --git a/webinterface/modules.py b/webinterface/modules.py index 34047fc8..fdf89644 100755 --- a/webinterface/modules.py +++ b/webinterface/modules.py @@ -101,12 +101,11 @@ async def show_modules(request): template = "modules.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, + "page": "modules", "modules": config.mercure.modules, "used_modules": used_modules, } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) @@ -204,7 +203,7 @@ async def edit_module(request): template = "modules_edit.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, + "page": "modules", "module": config.mercure.modules[module], "module_name": module, @@ -212,7 +211,6 @@ async def edit_module(request): "runtime": runtime, "support_root_modules": config.mercure.support_root_modules, } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) diff --git a/webinterface/query.py b/webinterface/query.py new file mode 100644 index 00000000..fa1077e2 --- /dev/null +++ b/webinterface/query.py @@ -0,0 +1,236 @@ +import os +import re +from pynetdicom import ( + AE, + QueryRetrievePresentationContexts, BasicWorklistManagementPresentationContexts, UnifiedProcedurePresentationContexts, + build_role, + evt, + StoragePresentationContexts + ) +from pynetdicom.sop_class import StudyRootQueryRetrieveInformationModelFind +from pynetdicom.apps.common import create_dataset +from pynetdicom._globals import DEFAULT_MAX_LENGTH +from pynetdicom.pdu_primitives import SOPClassExtendedNegotiation +from pynetdicom.sop_class import ( + PatientRootQueryRetrieveInformationModelGet, + StudyRootQueryRetrieveInformationModelGet, + PatientStudyOnlyQueryRetrieveInformationModelGet, + EncapsulatedSTLStorage, + EncapsulatedOBJStorage, + EncapsulatedMTLStorage, +) +from pydicom.uid import DeflatedExplicitVRLittleEndian +from pydicom import Dataset +import sys +import subprocess + + +class DicomClientCouldNotAssociate(Exception): + pass + +class DicomClientCouldNotFind(Exception): + pass + +class DicomClientBadStatus(Exception): + pass + +SOP_CLASS_PREFIXES = { + "1.2.840.10008.5.1.4.1.1.2": ("CT", "CT Image Storage"), + "1.2.840.10008.5.1.4.1.1.2.1": ("CTE", "Enhanced CT Image Storage"), + "1.2.840.10008.5.1.4.1.1.4": ("MR", "MR Image Storage"), + "1.2.840.10008.5.1.4.1.1.4.1": ("MRE", "Enhanced MR Image Storage"), + "1.2.840.10008.5.1.4.1.1.128": ("PT", "Positron Emission Tomography Image Storage"), + "1.2.840.10008.5.1.4.1.1.130": ("PTE", "Enhanced PET Image Storage"), + "1.2.840.10008.5.1.4.1.1.481.1": ("RI", "RT Image Storage"), + "1.2.840.10008.5.1.4.1.1.481.2": ("RD", "RT Dose Storage"), + "1.2.840.10008.5.1.4.1.1.481.5": ("RP", "RT Plan Storage"), + "1.2.840.10008.5.1.4.1.1.481.3": ("RS", "RT Structure Set Storage"), + "1.2.840.10008.5.1.4.1.1.1": ("CR", "Computed Radiography Image Storage"), + "1.2.840.10008.5.1.4.1.1.6.1": ("US", "Ultrasound Image Storage"), + "1.2.840.10008.5.1.4.1.1.6.2": ("USE", "Enhanced US Volume Storage"), + "1.2.840.10008.5.1.4.1.1.12.1": ("XA", "X-Ray Angiographic Image Storage"), + "1.2.840.10008.5.1.4.1.1.12.1.1": ("XAE", "Enhanced XA Image Storage"), + "1.2.840.10008.5.1.4.1.1.20": ("NM", "Nuclear Medicine Image Storage"), + "1.2.840.10008.5.1.4.1.1.7": ("SC", "Secondary Capture Image Storage"), +} +class SimpleDicomClient(): + host: str + port: int + called_aet: str + output_dir: str + def __init__(self, host, port, called_aet, out_dir): + self.host = host + self.port = port + self.called_aet = called_aet + self.output_dir = out_dir + + def handle_store(self, event): + try: + ds = event.dataset + # Remove any Group 0x0002 elements that may have been included + ds = ds[0x00030000:] + except Exception as exc: + print(exc) + return 0x210 + try: + sop_class = ds.SOPClassUID + # sanitize filename by replacing all illegal characters with underscores + sop_instance = re.sub(r"[^\d.]", "_", ds.SOPInstanceUID) + except Exception as exc: + print( + "Unable to decode the received dataset or missing 'SOP Class " + "UID' and/or 'SOP Instance UID' elements" + ) + print(exc) + # Unable to decode dataset + return 0xC210 + + try: + # Get the elements we need + mode_prefix = SOP_CLASS_PREFIXES[sop_class][0] + except KeyError: + mode_prefix = "UN" + + filename = f"{self.output_dir}/{mode_prefix}.{sop_instance}" + print(f"Storing DICOM file: {filename}") + + status_ds = Dataset() + status_ds.Status = 0x0000 + try: + if event.context.transfer_syntax == DeflatedExplicitVRLittleEndian: + # Workaround for pydicom issue #1086 + with open(filename, "wb") as f: + f.write(event.encoded_dataset()) + else: + # We use `write_like_original=False` to ensure that a compliant + # File Meta Information Header is written + ds.save_as(filename, write_like_original=False) + + status_ds.Status = 0x0000 # Success + except OSError as exc: + print("Could not write file to specified directory:") + print(f" {os.path.dirname(filename)}") + print(exc) + # Failed - Out of Resources - OSError + status_ds.Status = 0xA700 + except Exception as exc: + print("Could not write file to specified directory:") + print(f" {os.path.dirname(filename)}") + print(exc) + # Failed - Out of Resources - Miscellaneous error + status_ds.Status = 0xA701 + + subprocess.run(["/opt/mercure/app/bin/ubuntu22.04/getdcmtags", filename, self.called_aet, "MERCURE"]) + return status_ds + + + def getscu(self, accession_number): + # Exclude these SOP Classes + _exclusion = [ + EncapsulatedSTLStorage, + EncapsulatedOBJStorage, + EncapsulatedMTLStorage, + ] + store_contexts = [ + cx for cx in StoragePresentationContexts if cx.abstract_syntax not in _exclusion + ] + ae = AE(ae_title="MERCURE") + # Create application entity + # Binding to port 0 lets the OS pick an available port + ae.acse_timeout = 30 + ae.dimse_timeout = 30 + ae.network_timeout = 30 + ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet) + ae.add_requested_context(StudyRootQueryRetrieveInformationModelGet) + ae.add_requested_context(PatientStudyOnlyQueryRetrieveInformationModelGet) + ext_neg = [] + for cx in store_contexts: + ae.add_requested_context(cx.abstract_syntax) + # Add SCP/SCU Role Selection Negotiation to the extended negotiation + # We want to act as a Storage SCP + ext_neg.append(build_role(cx.abstract_syntax, scp_role=True)) + query_model = StudyRootQueryRetrieveInformationModelGet + assoc = ae.associate( + self.host, self.port, + ae_title=self.called_aet, + ext_neg=ext_neg, + evt_handlers=[(evt.EVT_C_STORE, self.handle_store, [])], + max_pdu=0, + ) + if not assoc.is_established: + raise DicomClientCouldNotAssociate() + # Send query + + ds = Dataset() + ds.QueryRetrieveLevel = 'STUDY' + ds.AccessionNumber = accession_number + + responses = assoc.send_c_get(ds, query_model) + success = False + for status, rsp_identifier in responses: + # If `status.Status` is one of the 'Pending' statuses then + # `rsp_identifier` is the C-GET response's Identifier dataset + if not status: + raise DicomClientBadStatus() + + if status.Status in [0xFF00, 0xFF01]: + yield status + success = True + if not success: + raise DicomClientCouldNotFind() + + assoc.release() + + def findscu(self,accession_number): + # Create application entity + ae = AE(ae_title=calling_aet) + + # Add a requested presentation context + # ae.add_requested_context(StudyRootQueryRetrieveInformationModelFind) + ae.requested_contexts = QueryRetrievePresentationContexts + # + BasicWorklistManagementPresentationContexts + # + UnifiedProcedurePresentationContexts ) + + # Associate with the peer AE + assoc = ae.associate(self.host, self.port, ae_title=self.called_aet, max_pdu=0, ext_neg=[]) + + ds = Dataset() + ds.QueryRetrieveLevel = 'STUDY' + ds.AccessionNumber = accession_number + if not assoc.is_established: + raise DicomClientCouldNotAssociate() + + try: + responses = assoc.send_c_find( + ds, + StudyRootQueryRetrieveInformationModelFind + ) + + for (status, identifier) in responses: + if not status: + print('Connection timed out, was aborted or received invalid response') + break + + if status.Status in [0xFF00, 0xFF01]: + # print('C-FIND query status: 0x{0:04x}'.format(status.Status)) + return identifier + # elif status.Status == 0x0000: + # print("Success") + # break + raise DicomClientCouldNotFind() + finally: + assoc.release() + +if __name__ == "__main__": + # Replace these variables with your actual values + remote_host = sys.argv[1] + remote_port = int(sys.argv[2]) + calling_aet = sys.argv[3] + called_aet = sys.argv[4] + accession_number = sys.argv[5] + + print(f"{remote_host=} {remote_port=} {calling_aet=} {called_aet=} {accession_number=}") + c = SimpleDicomClient(remote_host, remote_port, called_aet, "/tmp/test-move") + # study_uid = c.get_study_uid(accession_number) + # print(study_uid) + c.getscu(accession_number) \ No newline at end of file diff --git a/webinterface/queue.py b/webinterface/queue.py index b104134c..de92bcd1 100755 --- a/webinterface/queue.py +++ b/webinterface/queue.py @@ -57,12 +57,10 @@ async def show_queues(request): template = "queue.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, "page": "queue", "processing_suspended": processing_suspended, "routing_suspended": routing_suspended, } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) diff --git a/webinterface/rules.py b/webinterface/rules.py index 2d76bbfd..39c011b5 100755 --- a/webinterface/rules.py +++ b/webinterface/rules.py @@ -47,11 +47,9 @@ async def show_rules(request) -> Response: template = "rules.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, "page": "rules", "rules": config.mercure.rules, } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) @@ -112,7 +110,6 @@ async def rules_edit(request) -> Response: context = { "request": request, - "mercure_version": mercure_defs.VERSION, "page": "rules", "rules": config.mercure.rules, "targets": config.mercure.targets, @@ -123,7 +120,6 @@ async def rules_edit(request) -> Response: "processing_settings": settings_string, "process_runner": config.mercure.process_runner } - context.update(get_user_information(request)) template = "rules_edit.html" return templates.TemplateResponse(template, context) diff --git a/webinterface/targets.py b/webinterface/targets.py index bf78ceb6..9dddb525 100755 --- a/webinterface/targets.py +++ b/webinterface/targets.py @@ -50,7 +50,6 @@ async def show_targets(request) -> Response: template = "targets.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, "page": "targets", "targets": config.mercure.targets, "used_targets": used_targets, @@ -104,7 +103,6 @@ async def targets_edit(request) -> Response: template = "targets_edit.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, "page": "targets", "targets": config.mercure.targets, "edittarget": edittarget, diff --git a/webinterface/templates/dashboards/dashboards.html b/webinterface/templates/dashboards/dashboards.html index 2cb4c137..afe990b0 100644 --- a/webinterface/templates/dashboards/dashboards.html +++ b/webinterface/templates/dashboards/dashboards.html @@ -7,6 +7,7 @@ diff --git a/webinterface/templates/dashboards/query.html b/webinterface/templates/dashboards/query.html new file mode 100644 index 00000000..48756c04 --- /dev/null +++ b/webinterface/templates/dashboards/query.html @@ -0,0 +1,117 @@ +{% extends "dashboards/dashboards.html" %} + +{% block title %}Query{% endblock %} + +{% block extra_head %} + + +{% endblock %} + +{% block dashboard_content %} +
+

DICOM Query

+
+
+
+
+ +
+
+
+
+
+ +
+
+
+
+
+
+
+ +
+
+
+
+
+ +
+
+
+
+
+ +
+
+ +
+
+
+
+ +
+
+
+
+
+ +
+
+ +
+ + + + + + + + + + + + + +
idstatusenqueued_ataccessionresultprogress
+
+
+ +
+ +{% endblock %} \ No newline at end of file diff --git a/webinterface/users.py b/webinterface/users.py index 1d70f57f..f7b7276b 100755 --- a/webinterface/users.py +++ b/webinterface/users.py @@ -187,8 +187,7 @@ async def show_users(request) -> Response: return PlainTextResponse("Configuration is being updated. Try again in a minute.") template = "users.html" - context = {"request": request, "mercure_version": mercure_defs.VERSION, "page": "users", "users": users_list} - context.update(get_user_information(request)) + context = {"request": request, "page": "users", "users": users_list} return templates.TemplateResponse(template, context) @@ -237,12 +236,10 @@ async def users_edit(request) -> Response: template = "users_edit.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, "page": "users", "edituser": edituser, "edituser_info": users_list[edituser], } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) From 3928935b6812a8d3dc3037362dff4a1eb15c6770 Mon Sep 17 00:00:00 2001 From: Roy Wiggins Date: Fri, 31 May 2024 22:41:35 +0000 Subject: [PATCH 02/42] beginner support for batch process --- requirements.in | 10 +- requirements.txt | 24 ++- webinterface/common.py | 7 +- webinterface/dashboards/query.py | 146 ++++++++++++++++-- webinterface/templates/dashboards/query.html | 36 ++++- .../dashboards/query_job_fragment.html | 29 ++++ 6 files changed, 227 insertions(+), 25 deletions(-) create mode 100644 webinterface/templates/dashboards/query_job_fragment.html diff --git a/requirements.in b/requirements.in index bbfe31af..ca789b3e 100644 --- a/requirements.in +++ b/requirements.in @@ -57,8 +57,10 @@ pillow >= 10.0.1 dicomweb-client boto3 freezegun -certifi>=2023.07.22 -future>=0.18.3 -lxml>=4.9.1 +certifi >= 2023.07.22 +future >= 0.18.3 +lxml >= 4.9.1 pyxnat -urllib3>=1.26.18 \ No newline at end of file +urllib3 >= 1.26.18 +rq ~= 1.16.2 +rq-scheduler ~= 0.13.1 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index c1c269d9..66a94595 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with Python 3.10 # by the following command: # -# pip-compile requirements.in +# pip-compile # aiofiles==23.2.1 # via -r requirements.in @@ -25,6 +25,7 @@ async-timeout==4.0.3 # aiohttp # aiopg # asyncpg + # redis asyncpg==0.29.0 # via -r requirements.in attrs==23.2.0 @@ -40,11 +41,14 @@ botocore==1.34.40 certifi==2024.2.2 # via # -r requirements.in + # influxdb-client # requests charset-normalizer==3.3.2 # via requests click==8.1.7 - # via uvicorn + # via + # rq + # uvicorn daiquiri==3.2.5.1 # via -r requirements.in databases[aiopg]==0.8.0 @@ -86,10 +90,10 @@ idna==3.6 # yarl imagesize==1.4.1 # via sphinx +influxdb-client==1.38.0 + # via -r requirements.in iniconfig==2.0.0 # via pytest -influxdb_client==1.38.0 - # via -r requirements.in itsdangerous==2.1.2 # via -r requirements.in jinja2==3.1.3 @@ -164,6 +168,7 @@ python-dateutil==2.8.2 # via # botocore # freezegun + # influxdb-client python-json-logger==2.0.7 # via daiquiri python-multipart==0.0.9 @@ -172,6 +177,10 @@ python-nomad==2.0.0 # via -r requirements.in pyxnat==1.6.2 # via -r requirements.in +reactivex==4.0.4 + # via influxdb-client +redis==5.0.4 + # via rq requests==2.31.0 # via # dicomweb-client @@ -181,6 +190,8 @@ requests==2.31.0 # sphinx retrying==1.3.4 # via dicomweb-client +rq==1.16.2 + # via -r requirements.in s3transfer==0.10.0 # via boto3 six==1.16.0 @@ -244,11 +255,13 @@ typing-extensions==4.9.0 # asgiref # mypy # pydantic + # reactivex urllib3==2.0.7 # via # -r requirements.in # botocore # docker + # influxdb-client # requests # types-requests uvicorn==0.16.0 @@ -259,3 +272,6 @@ wheel==0.42.0 # via -r requirements.in yarl==1.9.4 # via aiohttp + +# The following packages are considered to be unsafe in a requirements file: +# setuptools diff --git a/webinterface/common.py b/webinterface/common.py index 60e65adb..25a35efd 100755 --- a/webinterface/common.py +++ b/webinterface/common.py @@ -14,10 +14,11 @@ from starlette.templating import Jinja2Templates from common.constants import mercure_defs +from rq_scheduler import Scheduler - - -worker_queue = Queue(connection=Redis()) +redis = Redis() +worker_queue = Queue(connection=redis) +worker_scheduler = Scheduler(queue=worker_queue, connection=worker_queue.connection) def get_user_information(request) -> dict: """Returns dictionary of values that should always be passed to the templates when the user is logged in.""" diff --git a/webinterface/dashboards/query.py b/webinterface/dashboards/query.py index e27f0740..8dd692de 100644 --- a/webinterface/dashboards/query.py +++ b/webinterface/dashboards/query.py @@ -2,7 +2,7 @@ from webinterface.query import SimpleDicomClient # Standard python includes from datetime import datetime - +import time, random # Starlette-related includes from starlette.authentication import requires @@ -11,34 +11,139 @@ from webinterface.common import templates import common.config as config from starlette.responses import PlainTextResponse, JSONResponse -from webinterface.common import worker_queue +from webinterface.common import worker_queue, redis +from rq import Connection from rq import get_current_job - +from rq.job import Job from .common import router logger = config.get_logger() -def test_job(*,accession, node): +def query_job(*,accession, node): config.read_config() c = SimpleDicomClient(node.ip, node.port, node.aet_target, config.mercure.incoming_folder) + job = get_current_job() + job.meta["started"] = 1 + job.save_meta() for identifier in c.getscu(accession): - job = get_current_job() job.meta['failed'] = identifier.NumberOfFailedSuboperations job.meta['remaining'] = identifier.NumberOfRemainingSuboperations job.meta['completed'] = identifier.NumberOfCompletedSuboperations + if not job.meta.get('total', False): + job.meta['total'] = identifier.NumberOfCompletedSuboperations + identifier.NumberOfRemainingSuboperations + job.meta["started"] += 1 job.save_meta() return "Complete" +def dummy_job(*,accession, node): + total_time = 3 # Total time for the job in seconds (1 minute) + update_interval = 1 # Interval between updates in seconds + + start_time = time.monotonic() + job = get_current_job() + if job.meta.get('parent'): + job_parent = worker_queue.fetch_job(job.meta['parent']) + else: + job_parent = None + # failed = 0 + remaining = total_time // update_interval + completed = 0 + print(accession) + if job_parent: + job_parent.meta['started'] = job_parent.meta.get('started',0) + 1 + job_parent.save_meta() + + job.meta['started'] = 1 + job.meta['total'] = remaining + job.meta['progress'] = f"0 / {job.meta['total']}" + job.save_meta() + while (time.monotonic() - start_time) < total_time: + time.sleep(update_interval) # Sleep for the interval duration + remaining -= 1 + completed += 1 + + # job.meta['failed'] = failed + job.meta['remaining'] = remaining + job.meta['completed'] = completed + job.meta['progress'] = f"{completed} / {job.meta['total']}" + print(job.meta['progress']) + job.save_meta() # Save the updated meta data to the job + + if job_parent: + job_parent.meta['completed'] += 1 + job_parent.meta['progress'] = f"{job_parent.meta['completed'] } / {job_parent.meta['total']}" + + job_parent.save_meta() + return "Job complete" + +def batch_job(*, accessions, subjobs): + print(subjobs) + return "Batch complete" + +def monitor_job(): + print("monitoring") + +@router.get("/query/job_info") +@requires(["authenticated", "admin"], redirect="login") +async def get_job_info(request): + job_id = request.query_params['id'] + job = worker_queue.fetch_job(job_id) + if not job: + return JSONResponse({'error': 'Job not found'}, status_code=404) + + subjob_info = [] + for job_id in job.kwargs.get('subjobs',[]): + subjob = worker_queue.fetch_job(job_id) + if subjob: + subjob_info.append({'id': subjob.get_id(), + 'ended_at': subjob.ended_at.isoformat().split('.')[0] if subjob.ended_at else "", + 'created_at_dt':subjob.created_at, + 'accession': subjob.kwargs['accession'], + 'progress': subjob.meta.get('progress'), + 'status': subjob.get_status()}) + subjob_info = sorted(subjob_info, key=lambda x:x['created_at_dt']) + return templates.TemplateResponse("dashboards/query_job_fragment.html", {"request":request,"subjob_info":subjob_info}) + @router.post("/query") @requires(["authenticated", "admin"], redirect="login") -async def query_post(request): +async def query_post_batch(request): + """ + Starts a new query job for the given accession number and DICOM node. + """ form = await request.form() + for n in config.mercure.dicom_retrieve.dicom_nodes: + if n.name == form.get("dicom_node"): + node = n + break + random_accessions = ["".join(random.choices([str(i) for i in range(10)], k=10)) for _ in range(5)] + jobs = [] + with Connection(redis): + for accession in random_accessions: + job = Job.create(dummy_job, kwargs=dict(accession=accession, node=node), timeout='30m', result_ttl=-1, meta=dict(type="get_accession_batch",parent=None)) + jobs.append(job) + full_job = Job.create(batch_job, kwargs=dict(accessions=random_accessions, subjobs=[j.id for j in jobs]), timeout=-1, result_ttl=-1, meta=dict(type="batch", started=0, completed=0, total=len(jobs)), depends_on=[j.id for j in jobs]) + for j in jobs: + j.meta["parent"] = full_job.id + + for j in jobs: + worker_queue.enqueue_job(j) + worker_queue.enqueue_job(full_job) + # worker_scheduler.schedule(scheduled_time=datetime.utcnow(), func=monitor_job, interval=10, repeat=10, result_ttl=-1) + return PlainTextResponse() + +@router.post("/query_single") +@requires(["authenticated", "admin"], redirect="login") +async def query_post(request): + """ + Starts a new query job for the given accession number and DICOM node. + """ + form = await request.form() for n in config.mercure.dicom_retrieve.dicom_nodes: if n.name == form.get("dicom_node"): node = n break - worker_queue.enqueue_call(test_job, kwargs=dict(accession=form.get("accession"), node=node), timeout='10m', result_ttl=-1) + worker_queue.enqueue_call(query_job, kwargs=dict(accession=form.get("accession"), node=node), timeout='30m', result_ttl=-1, meta=dict(type="get_accession_single")) return PlainTextResponse() @router.get("/query/jobs") @@ -55,12 +160,33 @@ async def query_jobs(request): for r in registries: for j_id in r.get_job_ids(): job = worker_queue.fetch_job(j_id) - job_info.append(dict(id=j_id, + if job.meta.get('type') != 'batch': + continue + job_dict = dict(id=j_id, status=job.get_status(), parameters=dict(accession=job.kwargs.get('accession','')), - enqueued_at=1000*datetime.timestamp(job.enqueued_at), + created_at=1000*datetime.timestamp(job.created_at) if job.created_at else "", + enqueued_at=1000*datetime.timestamp(job.enqueued_at) if job.enqueued_at else "", result=job.result, - meta=job.meta)) + meta=job.meta, + progress="") + # if job.meta.get('completed') and job.meta.get('remaining'): + # job_dict["progress"] = f"{job.meta.get('completed')} / {job.meta.get('completed') + job.meta.get('remaining')}" + # if job.meta.get('type',None) == "batch": + n_started = job.meta.get('started',0) + n_completed = job.meta.get('completed',0) + n_total = job.meta.get('total',0) + + if job_dict["status"] == "finished": + job_dict["progress"] = f"{n_total} / {n_total}" + elif job_dict["status"] in ("deferred","started"): + job_dict["progress"] = f"{n_completed} / {n_total}" + if 0 < n_started < n_total: + job_dict["status"] = "running" + elif n_completed == n_total: + job_dict["status"] = "finishing" + + job_info.append(job_dict) return JSONResponse(dict(data=job_info)) # return PlainTextResponse(",".join([str(j) for j in all_jobs])) diff --git a/webinterface/templates/dashboards/query.html b/webinterface/templates/dashboards/query.html index 48756c04..f595df46 100644 --- a/webinterface/templates/dashboards/query.html +++ b/webinterface/templates/dashboards/query.html @@ -68,9 +68,11 @@

DICOM Query

+ + - + @@ -93,14 +95,22 @@

DICOM Query

ajax: "query/jobs", deferRender: true, columns: [ + { + className: 'dt-control', + orderable: false, + data: null, + defaultContent: '' + }, { data: 'id' }, + { data: 'meta', render: data => data["type"]? data["type"]:"" }, { data: 'status' }, - { data: 'enqueued_at', render: data => `${data}`+new Date(data).toLocaleString("en-US")}, + { data: 'created_at', render: data => `${data || Date.now() }`+(data? new Date(data).toLocaleString("en-US"):"")}, { data: 'parameters', render: data => data["accession"] || ""}, { data: 'result' }, - { data: 'meta', render: data => data["completed"]?`${data["completed"]}/${data["remaining"]+data["completed"]}`:"" }, + { data: 'progress'}, ], select: { + selector:'td:not(:first-child)', style: 'os' }, language: { @@ -108,10 +118,28 @@

DICOM Query

}, // filter: true, buttons: [], - order: [[2, 'desc']], + order: [[4, 'desc']], initComplete: function() { } }); + window.datatable.on('click', 'td.dt-control', function (e) { + let tr = e.target.closest('tr'); + let row = window.datatable.row(tr); + + if (row.child.isShown()) { + // This row is already open - close it + row.child.hide(); + } + else { + // Open this row + fetch("query/job_info?id="+row.data().id, {method: "GET"} + ).then( + response => response.text() + ).then(data=>{ + row.child(data).show(); + }); + } + }); }); {% endblock %} \ No newline at end of file diff --git a/webinterface/templates/dashboards/query_job_fragment.html b/webinterface/templates/dashboards/query_job_fragment.html new file mode 100644 index 00000000..36314c81 --- /dev/null +++ b/webinterface/templates/dashboards/query_job_fragment.html @@ -0,0 +1,29 @@ +
idtype statusenqueued_atcreated_at accession result progress
+ + + + + + + + + + {% for job in subjob_info %} + + + + + + + + {% endfor %} +
+ + Accession + + Ended at + + Status + + Progress +
{{job['accession']}}{{job['ended_at'] or ''}}{{job['status']}}{{job['progress'] or ''}}
\ No newline at end of file From 0f753e0db5feac2c0354d9ab27016d63835b6185 Mon Sep 17 00:00:00 2001 From: Roy Wiggins Date: Fri, 31 May 2024 23:23:29 +0000 Subject: [PATCH 03/42] create dummy files --- webinterface/dashboards/query.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/webinterface/dashboards/query.py b/webinterface/dashboards/query.py index 8dd692de..81170039 100644 --- a/webinterface/dashboards/query.py +++ b/webinterface/dashboards/query.py @@ -1,4 +1,6 @@ +from pathlib import Path +import shutil from webinterface.query import SimpleDicomClient # Standard python includes from datetime import datetime @@ -34,7 +36,8 @@ def query_job(*,accession, node): job.save_meta() return "Complete" -def dummy_job(*,accession, node): +def dummy_job(*,accession, node, path): + Path(path).mkdir(parents=True, exist_ok=True) total_time = 3 # Total time for the job in seconds (1 minute) update_interval = 1 # Interval between updates in seconds @@ -58,6 +61,7 @@ def dummy_job(*,accession, node): job.save_meta() while (time.monotonic() - start_time) < total_time: time.sleep(update_interval) # Sleep for the interval duration + (Path(path) / f"dummy{completed}_{job.id}.dcm").touch() remaining -= 1 completed += 1 @@ -75,8 +79,10 @@ def dummy_job(*,accession, node): job_parent.save_meta() return "Job complete" -def batch_job(*, accessions, subjobs): - print(subjobs) +def batch_job(*, accessions, subjobs, path): + for p in Path(path).glob("**/*.dcm"): + shutil.move(p, "/opt/mercure/data/incoming") + shutil.rmtree(path) return "Batch complete" def monitor_job(): @@ -123,6 +129,9 @@ async def query_post_batch(request): full_job = Job.create(batch_job, kwargs=dict(accessions=random_accessions, subjobs=[j.id for j in jobs]), timeout=-1, result_ttl=-1, meta=dict(type="batch", started=0, completed=0, total=len(jobs)), depends_on=[j.id for j in jobs]) for j in jobs: j.meta["parent"] = full_job.id + j.kwargs["path"] = f"/opt/mercure/data/query/job_dirs/{full_job.id}/{j.kwargs['accession']}" + full_job.kwargs["path"] = Path(f"/opt/mercure/data/query/job_dirs/{full_job.id}") + for j in jobs: worker_queue.enqueue_job(j) From e3a86eda69392f3c00c0a471d53603c65c4978d3 Mon Sep 17 00:00:00 2001 From: Roy Wiggins Date: Fri, 7 Jun 2024 22:54:17 +0000 Subject: [PATCH 04/42] - pause/resume jobs - redirect results to folder --- common/types.py | 7 +- webinterface/common.py | 3 + webinterface/dashboards/query.py | 200 ++++++++++++++----- webinterface/templates/dashboards/query.html | 95 ++++++++- 4 files changed, 248 insertions(+), 57 deletions(-) diff --git a/common/types.py b/common/types.py index 9558db6c..aaba7738 100755 --- a/common/types.py +++ b/common/types.py @@ -185,9 +185,14 @@ class DicomNode(BaseModel): aet_target: str aet_source: Optional[str] = "" +class DicomDestination(BaseModel): + name: str + path: str + class DicomRetrieveConfig(BaseModel): dicom_nodes: List[DicomNode] = [] - + destination_folders: List[DicomDestination] = [] + class Config(BaseModel, Compat): appliance_name: str port: int diff --git a/webinterface/common.py b/webinterface/common.py index 25a35efd..d0692a89 100755 --- a/webinterface/common.py +++ b/webinterface/common.py @@ -17,6 +17,9 @@ from rq_scheduler import Scheduler redis = Redis() + + + worker_queue = Queue(connection=redis) worker_scheduler = Scheduler(queue=worker_queue, connection=worker_queue.connection) diff --git a/webinterface/dashboards/query.py b/webinterface/dashboards/query.py index 81170039..eca892da 100644 --- a/webinterface/dashboards/query.py +++ b/webinterface/dashboards/query.py @@ -20,6 +20,8 @@ from .common import router logger = config.get_logger() + + def query_job(*,accession, node): config.read_config() c = SimpleDicomClient(node.ip, node.port, node.aet_target, config.mercure.incoming_folder) @@ -36,9 +38,10 @@ def query_job(*,accession, node): job.save_meta() return "Complete" + def dummy_job(*,accession, node, path): Path(path).mkdir(parents=True, exist_ok=True) - total_time = 3 # Total time for the job in seconds (1 minute) + total_time = 10 # Total time for the job in seconds (1 minute) update_interval = 1 # Interval between updates in seconds start_time = time.monotonic() @@ -61,7 +64,10 @@ def dummy_job(*,accession, node, path): job.save_meta() while (time.monotonic() - start_time) < total_time: time.sleep(update_interval) # Sleep for the interval duration - (Path(path) / f"dummy{completed}_{job.id}.dcm").touch() + out_file = (Path(path) / f"dummy{completed}_{job.id}.dcm") + if out_file.exists(): + raise Exception(f"{out_file} exists already") + out_file.touch() remaining -= 1 completed += 1 @@ -73,21 +79,82 @@ def dummy_job(*,accession, node, path): job.save_meta() # Save the updated meta data to the job if job_parent: + job_parent.get_meta() # there is technically a race condition here... job_parent.meta['completed'] += 1 - job_parent.meta['progress'] = f"{job_parent.meta['completed'] } / {job_parent.meta['total']}" + job_parent.meta['progress'] = f"{job_parent.meta['started'] } / {job_parent.meta['completed'] } / {job_parent.meta['total']}" job_parent.save_meta() return "Job complete" -def batch_job(*, accessions, subjobs, path): - for p in Path(path).glob("**/*.dcm"): - shutil.move(p, "/opt/mercure/data/incoming") +def batch_job(*, accessions, subjobs, path, destination): + job = get_current_job() + job.save_meta() + logger.info(f"Job completing {job.id}") + logger.info(path) + if destination is None: + for p in Path(path).glob("**/*"): + if p.is_file(): + shutil.move(p, config.mercure.incoming_folder) + else: + dest_folder: Path = Path(destination) / job.id + dest_folder.mkdir() + for p in Path(path).iterdir(): + if p.is_dir(): + logger.info(f"moving {p} to {dest_folder}") + shutil.move(p, dest_folder) + shutil.rmtree(path) - return "Batch complete" + return "Job complete" + + def monitor_job(): print("monitoring") +@router.post("/query/pause_job") +@requires(["authenticated", "admin"], redirect="login") +async def pause_job(request): + job = worker_queue.fetch_job(request.query_params['id']) + if not job: + return JSONResponse({'error': 'Job not found'}, status_code=404) + if job.is_finished or job.is_failed: + return JSONResponse({'error': 'Job is already finished'}, status_code=400) + + for job_id in job.kwargs.get('subjobs',[]): + subjob = worker_queue.fetch_job(job_id) + if subjob and (subjob.is_deferred or subjob.is_queued): + subjob.meta['paused'] = True + subjob.save_meta() + subjob.cancel() + job.meta['paused'] = True + job.save_meta() + return JSONResponse({'status': 'success'}, status_code=200) + +@router.post("/query/resume_job") +@requires(["authenticated", "admin"], redirect="login") +async def resume_job(request): + job = worker_queue.fetch_job(request.query_params['id']) + if not job: + return JSONResponse({'error': 'Job not found'}, status_code=404) + if job.is_finished or job.is_failed: + return JSONResponse({'error': 'Job is already finished'}, status_code=400) + # if not job.meta.get('paused', False): + # return JSONResponse({'error': 'Job is not paused'}, status_code=400) + + for subjob_id in job.kwargs.get('subjobs',[]): + subjob = worker_queue.fetch_job(subjob_id) + if subjob and subjob.meta.get('paused', None): + subjob.meta['paused'] = False + subjob.save_meta() + worker_queue.canceled_job_registry.requeue(subjob_id) + # worker_queue.canceled_job_registry.remove(subjob_id) + job.get_meta() + job.meta['paused'] = False + job.save_meta() + # worker_queue.canceled_job_registry.requeue(job.id) + # worker_queue.canceled_job_registry.remove(job.id) + return JSONResponse({'status': 'success'}, status_code=200) + @router.get("/query/job_info") @requires(["authenticated", "admin"], redirect="login") async def get_job_info(request): @@ -97,17 +164,21 @@ async def get_job_info(request): return JSONResponse({'error': 'Job not found'}, status_code=404) subjob_info = [] - for job_id in job.kwargs.get('subjobs',[]): - subjob = worker_queue.fetch_job(job_id) - if subjob: - subjob_info.append({'id': subjob.get_id(), - 'ended_at': subjob.ended_at.isoformat().split('.')[0] if subjob.ended_at else "", - 'created_at_dt':subjob.created_at, - 'accession': subjob.kwargs['accession'], - 'progress': subjob.meta.get('progress'), - 'status': subjob.get_status()}) + subjobs = (worker_queue.fetch_job(job) for job in job.kwargs.get('subjobs', [])) + for subjob in subjobs: + info = {'id': subjob.get_id(), + 'ended_at': subjob.ended_at.isoformat().split('.')[0] if subjob.ended_at else "", + 'created_at_dt':subjob.created_at, + 'accession': subjob.kwargs['accession'], + 'progress': subjob.meta.get('progress'), + 'paused': subjob.meta.get('paused',False), + 'status': subjob.get_status() + } + if info['status'] == 'canceled' and info['paused']: + info['status'] = 'paused' + subjob_info.append(info) subjob_info = sorted(subjob_info, key=lambda x:x['created_at_dt']) - return templates.TemplateResponse("dashboards/query_job_fragment.html", {"request":request,"subjob_info":subjob_info}) + return templates.TemplateResponse("dashboards/query_job_fragment.html", {"request":request,"job":job,"subjob_info":subjob_info}) @router.post("/query") @requires(["authenticated", "admin"], redirect="login") @@ -120,18 +191,22 @@ async def query_post_batch(request): if n.name == form.get("dicom_node"): node = n break - random_accessions = ["".join(random.choices([str(i) for i in range(10)], k=10)) for _ in range(5)] + destination = form.get("destination") + for d in config.mercure.dicom_retrieve.destination_folders: + if d.name == destination: + destination_path = d.path + random_accessions = ["".join(random.choices([str(i) for i in range(10)], k=5)) for _ in range(3)] jobs = [] with Connection(redis): for accession in random_accessions: - job = Job.create(dummy_job, kwargs=dict(accession=accession, node=node), timeout='30m', result_ttl=-1, meta=dict(type="get_accession_batch",parent=None)) + job = Job.create(dummy_job, kwargs=dict(accession=accession, node=node), timeout='30m', result_ttl=-1, meta=dict(type="get_accession_batch",parent=None, paused=False)) jobs.append(job) - full_job = Job.create(batch_job, kwargs=dict(accessions=random_accessions, subjobs=[j.id for j in jobs]), timeout=-1, result_ttl=-1, meta=dict(type="batch", started=0, completed=0, total=len(jobs)), depends_on=[j.id for j in jobs]) + full_job = Job.create(batch_job, kwargs=dict(accessions=random_accessions, subjobs=[j.id for j in jobs], destination=destination_path), timeout=-1, result_ttl=-1, meta=dict(type="batch", started=0, paused=False,completed=0, total=len(jobs)), depends_on=[j.id for j in jobs]) for j in jobs: j.meta["parent"] = full_job.id j.kwargs["path"] = f"/opt/mercure/data/query/job_dirs/{full_job.id}/{j.kwargs['accession']}" full_job.kwargs["path"] = Path(f"/opt/mercure/data/query/job_dirs/{full_job.id}") - + full_job.kwargs["path"].mkdir(parents=True) for j in jobs: worker_queue.enqueue_job(j) @@ -164,38 +239,59 @@ async def query_jobs(request): worker_queue.finished_job_registry, # Returns FinishedJobRegistry worker_queue.failed_job_registry, # Returns FailedJobRegistry worker_queue.scheduled_job_registry, # Returns ScheduledJobRegistry + worker_queue.canceled_job_registry, # Returns CanceledJobRegistry ] job_info = [] - for r in registries: - for j_id in r.get_job_ids(): - job = worker_queue.fetch_job(j_id) - if job.meta.get('type') != 'batch': - continue - job_dict = dict(id=j_id, - status=job.get_status(), - parameters=dict(accession=job.kwargs.get('accession','')), - created_at=1000*datetime.timestamp(job.created_at) if job.created_at else "", - enqueued_at=1000*datetime.timestamp(job.enqueued_at) if job.enqueued_at else "", - result=job.result, - meta=job.meta, - progress="") - # if job.meta.get('completed') and job.meta.get('remaining'): - # job_dict["progress"] = f"{job.meta.get('completed')} / {job.meta.get('completed') + job.meta.get('remaining')}" - # if job.meta.get('type',None) == "batch": - n_started = job.meta.get('started',0) - n_completed = job.meta.get('completed',0) - n_total = job.meta.get('total',0) - - if job_dict["status"] == "finished": - job_dict["progress"] = f"{n_total} / {n_total}" - elif job_dict["status"] in ("deferred","started"): - job_dict["progress"] = f"{n_completed} / {n_total}" - if 0 < n_started < n_total: - job_dict["status"] = "running" - elif n_completed == n_total: - job_dict["status"] = "finishing" - - job_info.append(job_dict) + # logger.info(worker_queue.job_ids) + # for registry in registries: + job_ids = set() + for registry in registries: + for j_id in registry.get_job_ids(): + job_ids.add(j_id) + for j_id in worker_queue.job_ids: + job_ids.add(j_id) + + for j_id in job_ids: + job = worker_queue.fetch_job(j_id) + job_meta = job.get_meta() + if job_meta.get('type') != 'batch': + continue + job_dict = dict(id=j_id, + status=job.get_status(), + parameters=dict(accession=job.kwargs.get('accession','')), + created_at=1000*datetime.timestamp(job.created_at) if job.created_at else "", + enqueued_at=1000*datetime.timestamp(job.enqueued_at) if job.enqueued_at else "", + result=job.result, + meta=job_meta, + progress="") + # if job.meta.get('completed') and job.meta.get('remaining'): + # job_dict["progress"] = f"{job.meta.get('completed')} / {job.meta.get('completed') + job.meta.get('remaining')}" + # if job.meta.get('type',None) == "batch": + n_started = job_meta.get('started',0) + n_completed = job_meta.get('completed',0) + n_total = job_meta.get('total',0) + + if job_dict["status"] == "finished": + job_dict["progress"] = f"{n_total} / {n_total}" + elif job_dict["status"] in ("deferred","started", "paused", "canceled"): + job_dict["progress"] = f"{n_completed} / {n_total}" + + # if job_dict["status"] == "canceled" and + if job_dict["meta"].get('paused', False): + if n_started < n_completed: # TODO: this does not work + job_dict["status"] = "pausing" + else: + job_dict["status"] = "paused" + + if job_dict["status"] in ("deferred", "started"): + if n_started == 0: + job_dict["status"] = "waiting" + elif n_completed < n_total: + job_dict["status"] = "running" + elif n_completed == n_total: + job_dict["status"] = "finishing" + + job_info.append(job_dict) return JSONResponse(dict(data=job_info)) # return PlainTextResponse(",".join([str(j) for j in all_jobs])) @@ -205,7 +301,7 @@ async def query(request): template = "dashboards/query.html" context = { "request": request, - + "destination_folders": config.mercure.dicom_retrieve.destination_folders, "dicom_nodes": config.mercure.dicom_retrieve.dicom_nodes, "page": "query", } diff --git a/webinterface/templates/dashboards/query.html b/webinterface/templates/dashboards/query.html index f595df46..085cc209 100644 --- a/webinterface/templates/dashboards/query.html +++ b/webinterface/templates/dashboards/query.html @@ -44,7 +44,26 @@

DICOM Query

- +
+
+ +
+
+
+
+
+ +
+
+
+
+
@@ -59,7 +78,6 @@

DICOM Query

-
@@ -84,12 +102,45 @@

DICOM Query

+ + + +{% endblock %} + +{% block dashboard_content %} +
+

DICOM Query

+
+
+
+
+ +
+
+
+
+
+ +
+
+
+
+
+
+
+ +
+
+
+
+
+ +
+
+
+
+
+ +
+
+ +
+
+
+
+ +
+
+
+
+
+ +
+
+ +
+ + + + + + + + + + + + + +
idstatusenqueued_ataccessionresultprogress
+
+
+ +
+ +{% endblock %} \ No newline at end of file diff --git a/webinterface/users.py b/webinterface/users.py index 1d70f57f..f7b7276b 100755 --- a/webinterface/users.py +++ b/webinterface/users.py @@ -187,8 +187,7 @@ async def show_users(request) -> Response: return PlainTextResponse("Configuration is being updated. Try again in a minute.") template = "users.html" - context = {"request": request, "mercure_version": mercure_defs.VERSION, "page": "users", "users": users_list} - context.update(get_user_information(request)) + context = {"request": request, "page": "users", "users": users_list} return templates.TemplateResponse(template, context) @@ -237,12 +236,10 @@ async def users_edit(request) -> Response: template = "users_edit.html" context = { "request": request, - "mercure_version": mercure_defs.VERSION, "page": "users", "edituser": edituser, "edituser_info": users_list[edituser], } - context.update(get_user_information(request)) return templates.TemplateResponse(template, context) From 6a6eabad23e1e7a27c8c767c31a9224d351c8ba4 Mon Sep 17 00:00:00 2001 From: Roy Wiggins Date: Fri, 28 Jun 2024 17:57:21 +0000 Subject: [PATCH 13/42] beginner support for batch process --- requirements.in | 10 +- requirements.txt | 18 +-- webinterface/common.py | 7 +- webinterface/dashboards/query.py | 146 ++++++++++++++++-- webinterface/templates/dashboards/query.html | 36 ++++- .../dashboards/query_job_fragment.html | 29 ++++ 6 files changed, 216 insertions(+), 30 deletions(-) create mode 100644 webinterface/templates/dashboards/query_job_fragment.html diff --git a/requirements.in b/requirements.in index c01e860c..62c8df45 100644 --- a/requirements.in +++ b/requirements.in @@ -57,8 +57,10 @@ pillow >= 10.0.1 dicomweb-client boto3 freezegun -certifi>=2023.07.22 -future>=0.18.3 -lxml>=4.9.1 +certifi >= 2023.07.22 +future >= 0.18.3 +lxml >= 4.9.1 pyxnat -urllib3>=1.26.18 \ No newline at end of file +urllib3 >= 1.26.18 +rq ~= 1.16.2 +rq-scheduler ~= 0.13.1 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 041369e5..a2f6d411 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with Python 3.10 # by the following command: # -# pip-compile requirements.in +# pip-compile # aiofiles==23.2.1 # via -r requirements.in @@ -25,6 +25,7 @@ async-timeout==4.0.3 # aiohttp # aiopg # asyncpg + # redis asyncpg==0.29.0 # via -r requirements.in attrs==23.2.0 @@ -45,7 +46,9 @@ certifi==2024.6.2 charset-normalizer==3.3.2 # via requests click==8.1.7 - # via uvicorn + # via + # rq + # uvicorn daiquiri==3.2.5.1 # via -r requirements.in databases[aiopg]==0.8.0 @@ -176,16 +179,13 @@ pyxnat==1.6.2 reactivex==4.0.4 # via influxdb-client requests==2.32.3 - # via - # dicomweb-client - # docker - # python-nomad - # pyxnat - # sphinx +redis==5.0.4 + # via rq retrying==1.3.4 # via dicomweb-client s3transfer==0.10.1 - # via boto3 +rq==1.16.2 + # via -r requirements.in six==1.16.0 # via # python-dateutil diff --git a/webinterface/common.py b/webinterface/common.py index 60e65adb..25a35efd 100755 --- a/webinterface/common.py +++ b/webinterface/common.py @@ -14,10 +14,11 @@ from starlette.templating import Jinja2Templates from common.constants import mercure_defs +from rq_scheduler import Scheduler - - -worker_queue = Queue(connection=Redis()) +redis = Redis() +worker_queue = Queue(connection=redis) +worker_scheduler = Scheduler(queue=worker_queue, connection=worker_queue.connection) def get_user_information(request) -> dict: """Returns dictionary of values that should always be passed to the templates when the user is logged in.""" diff --git a/webinterface/dashboards/query.py b/webinterface/dashboards/query.py index e27f0740..8dd692de 100644 --- a/webinterface/dashboards/query.py +++ b/webinterface/dashboards/query.py @@ -2,7 +2,7 @@ from webinterface.query import SimpleDicomClient # Standard python includes from datetime import datetime - +import time, random # Starlette-related includes from starlette.authentication import requires @@ -11,34 +11,139 @@ from webinterface.common import templates import common.config as config from starlette.responses import PlainTextResponse, JSONResponse -from webinterface.common import worker_queue +from webinterface.common import worker_queue, redis +from rq import Connection from rq import get_current_job - +from rq.job import Job from .common import router logger = config.get_logger() -def test_job(*,accession, node): +def query_job(*,accession, node): config.read_config() c = SimpleDicomClient(node.ip, node.port, node.aet_target, config.mercure.incoming_folder) + job = get_current_job() + job.meta["started"] = 1 + job.save_meta() for identifier in c.getscu(accession): - job = get_current_job() job.meta['failed'] = identifier.NumberOfFailedSuboperations job.meta['remaining'] = identifier.NumberOfRemainingSuboperations job.meta['completed'] = identifier.NumberOfCompletedSuboperations + if not job.meta.get('total', False): + job.meta['total'] = identifier.NumberOfCompletedSuboperations + identifier.NumberOfRemainingSuboperations + job.meta["started"] += 1 job.save_meta() return "Complete" +def dummy_job(*,accession, node): + total_time = 3 # Total time for the job in seconds (1 minute) + update_interval = 1 # Interval between updates in seconds + + start_time = time.monotonic() + job = get_current_job() + if job.meta.get('parent'): + job_parent = worker_queue.fetch_job(job.meta['parent']) + else: + job_parent = None + # failed = 0 + remaining = total_time // update_interval + completed = 0 + print(accession) + if job_parent: + job_parent.meta['started'] = job_parent.meta.get('started',0) + 1 + job_parent.save_meta() + + job.meta['started'] = 1 + job.meta['total'] = remaining + job.meta['progress'] = f"0 / {job.meta['total']}" + job.save_meta() + while (time.monotonic() - start_time) < total_time: + time.sleep(update_interval) # Sleep for the interval duration + remaining -= 1 + completed += 1 + + # job.meta['failed'] = failed + job.meta['remaining'] = remaining + job.meta['completed'] = completed + job.meta['progress'] = f"{completed} / {job.meta['total']}" + print(job.meta['progress']) + job.save_meta() # Save the updated meta data to the job + + if job_parent: + job_parent.meta['completed'] += 1 + job_parent.meta['progress'] = f"{job_parent.meta['completed'] } / {job_parent.meta['total']}" + + job_parent.save_meta() + return "Job complete" + +def batch_job(*, accessions, subjobs): + print(subjobs) + return "Batch complete" + +def monitor_job(): + print("monitoring") + +@router.get("/query/job_info") +@requires(["authenticated", "admin"], redirect="login") +async def get_job_info(request): + job_id = request.query_params['id'] + job = worker_queue.fetch_job(job_id) + if not job: + return JSONResponse({'error': 'Job not found'}, status_code=404) + + subjob_info = [] + for job_id in job.kwargs.get('subjobs',[]): + subjob = worker_queue.fetch_job(job_id) + if subjob: + subjob_info.append({'id': subjob.get_id(), + 'ended_at': subjob.ended_at.isoformat().split('.')[0] if subjob.ended_at else "", + 'created_at_dt':subjob.created_at, + 'accession': subjob.kwargs['accession'], + 'progress': subjob.meta.get('progress'), + 'status': subjob.get_status()}) + subjob_info = sorted(subjob_info, key=lambda x:x['created_at_dt']) + return templates.TemplateResponse("dashboards/query_job_fragment.html", {"request":request,"subjob_info":subjob_info}) + @router.post("/query") @requires(["authenticated", "admin"], redirect="login") -async def query_post(request): +async def query_post_batch(request): + """ + Starts a new query job for the given accession number and DICOM node. + """ form = await request.form() + for n in config.mercure.dicom_retrieve.dicom_nodes: + if n.name == form.get("dicom_node"): + node = n + break + random_accessions = ["".join(random.choices([str(i) for i in range(10)], k=10)) for _ in range(5)] + jobs = [] + with Connection(redis): + for accession in random_accessions: + job = Job.create(dummy_job, kwargs=dict(accession=accession, node=node), timeout='30m', result_ttl=-1, meta=dict(type="get_accession_batch",parent=None)) + jobs.append(job) + full_job = Job.create(batch_job, kwargs=dict(accessions=random_accessions, subjobs=[j.id for j in jobs]), timeout=-1, result_ttl=-1, meta=dict(type="batch", started=0, completed=0, total=len(jobs)), depends_on=[j.id for j in jobs]) + for j in jobs: + j.meta["parent"] = full_job.id + + for j in jobs: + worker_queue.enqueue_job(j) + worker_queue.enqueue_job(full_job) + # worker_scheduler.schedule(scheduled_time=datetime.utcnow(), func=monitor_job, interval=10, repeat=10, result_ttl=-1) + return PlainTextResponse() + +@router.post("/query_single") +@requires(["authenticated", "admin"], redirect="login") +async def query_post(request): + """ + Starts a new query job for the given accession number and DICOM node. + """ + form = await request.form() for n in config.mercure.dicom_retrieve.dicom_nodes: if n.name == form.get("dicom_node"): node = n break - worker_queue.enqueue_call(test_job, kwargs=dict(accession=form.get("accession"), node=node), timeout='10m', result_ttl=-1) + worker_queue.enqueue_call(query_job, kwargs=dict(accession=form.get("accession"), node=node), timeout='30m', result_ttl=-1, meta=dict(type="get_accession_single")) return PlainTextResponse() @router.get("/query/jobs") @@ -55,12 +160,33 @@ async def query_jobs(request): for r in registries: for j_id in r.get_job_ids(): job = worker_queue.fetch_job(j_id) - job_info.append(dict(id=j_id, + if job.meta.get('type') != 'batch': + continue + job_dict = dict(id=j_id, status=job.get_status(), parameters=dict(accession=job.kwargs.get('accession','')), - enqueued_at=1000*datetime.timestamp(job.enqueued_at), + created_at=1000*datetime.timestamp(job.created_at) if job.created_at else "", + enqueued_at=1000*datetime.timestamp(job.enqueued_at) if job.enqueued_at else "", result=job.result, - meta=job.meta)) + meta=job.meta, + progress="") + # if job.meta.get('completed') and job.meta.get('remaining'): + # job_dict["progress"] = f"{job.meta.get('completed')} / {job.meta.get('completed') + job.meta.get('remaining')}" + # if job.meta.get('type',None) == "batch": + n_started = job.meta.get('started',0) + n_completed = job.meta.get('completed',0) + n_total = job.meta.get('total',0) + + if job_dict["status"] == "finished": + job_dict["progress"] = f"{n_total} / {n_total}" + elif job_dict["status"] in ("deferred","started"): + job_dict["progress"] = f"{n_completed} / {n_total}" + if 0 < n_started < n_total: + job_dict["status"] = "running" + elif n_completed == n_total: + job_dict["status"] = "finishing" + + job_info.append(job_dict) return JSONResponse(dict(data=job_info)) # return PlainTextResponse(",".join([str(j) for j in all_jobs])) diff --git a/webinterface/templates/dashboards/query.html b/webinterface/templates/dashboards/query.html index 48756c04..f595df46 100644 --- a/webinterface/templates/dashboards/query.html +++ b/webinterface/templates/dashboards/query.html @@ -68,9 +68,11 @@

DICOM Query

+ + - + @@ -93,14 +95,22 @@

DICOM Query

ajax: "query/jobs", deferRender: true, columns: [ + { + className: 'dt-control', + orderable: false, + data: null, + defaultContent: '' + }, { data: 'id' }, + { data: 'meta', render: data => data["type"]? data["type"]:"" }, { data: 'status' }, - { data: 'enqueued_at', render: data => `${data}`+new Date(data).toLocaleString("en-US")}, + { data: 'created_at', render: data => `${data || Date.now() }`+(data? new Date(data).toLocaleString("en-US"):"")}, { data: 'parameters', render: data => data["accession"] || ""}, { data: 'result' }, - { data: 'meta', render: data => data["completed"]?`${data["completed"]}/${data["remaining"]+data["completed"]}`:"" }, + { data: 'progress'}, ], select: { + selector:'td:not(:first-child)', style: 'os' }, language: { @@ -108,10 +118,28 @@

DICOM Query

}, // filter: true, buttons: [], - order: [[2, 'desc']], + order: [[4, 'desc']], initComplete: function() { } }); + window.datatable.on('click', 'td.dt-control', function (e) { + let tr = e.target.closest('tr'); + let row = window.datatable.row(tr); + + if (row.child.isShown()) { + // This row is already open - close it + row.child.hide(); + } + else { + // Open this row + fetch("query/job_info?id="+row.data().id, {method: "GET"} + ).then( + response => response.text() + ).then(data=>{ + row.child(data).show(); + }); + } + }); }); {% endblock %} \ No newline at end of file diff --git a/webinterface/templates/dashboards/query_job_fragment.html b/webinterface/templates/dashboards/query_job_fragment.html new file mode 100644 index 00000000..36314c81 --- /dev/null +++ b/webinterface/templates/dashboards/query_job_fragment.html @@ -0,0 +1,29 @@ +
idtype statusenqueued_atcreated_at accession result progress
+ + + + + + + + + + {% for job in subjob_info %} + + + + + + + + {% endfor %} +
+ + Accession + + Ended at + + Status + + Progress +
{{job['accession']}}{{job['ended_at'] or ''}}{{job['status']}}{{job['progress'] or ''}}
\ No newline at end of file From 48205eb862a925e08389a55651ecd78294423556 Mon Sep 17 00:00:00 2001 From: Roy Wiggins Date: Fri, 28 Jun 2024 17:57:21 +0000 Subject: [PATCH 14/42] create dummy files --- webinterface/dashboards/query.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/webinterface/dashboards/query.py b/webinterface/dashboards/query.py index 8dd692de..81170039 100644 --- a/webinterface/dashboards/query.py +++ b/webinterface/dashboards/query.py @@ -1,4 +1,6 @@ +from pathlib import Path +import shutil from webinterface.query import SimpleDicomClient # Standard python includes from datetime import datetime @@ -34,7 +36,8 @@ def query_job(*,accession, node): job.save_meta() return "Complete" -def dummy_job(*,accession, node): +def dummy_job(*,accession, node, path): + Path(path).mkdir(parents=True, exist_ok=True) total_time = 3 # Total time for the job in seconds (1 minute) update_interval = 1 # Interval between updates in seconds @@ -58,6 +61,7 @@ def dummy_job(*,accession, node): job.save_meta() while (time.monotonic() - start_time) < total_time: time.sleep(update_interval) # Sleep for the interval duration + (Path(path) / f"dummy{completed}_{job.id}.dcm").touch() remaining -= 1 completed += 1 @@ -75,8 +79,10 @@ def dummy_job(*,accession, node): job_parent.save_meta() return "Job complete" -def batch_job(*, accessions, subjobs): - print(subjobs) +def batch_job(*, accessions, subjobs, path): + for p in Path(path).glob("**/*.dcm"): + shutil.move(p, "/opt/mercure/data/incoming") + shutil.rmtree(path) return "Batch complete" def monitor_job(): @@ -123,6 +129,9 @@ async def query_post_batch(request): full_job = Job.create(batch_job, kwargs=dict(accessions=random_accessions, subjobs=[j.id for j in jobs]), timeout=-1, result_ttl=-1, meta=dict(type="batch", started=0, completed=0, total=len(jobs)), depends_on=[j.id for j in jobs]) for j in jobs: j.meta["parent"] = full_job.id + j.kwargs["path"] = f"/opt/mercure/data/query/job_dirs/{full_job.id}/{j.kwargs['accession']}" + full_job.kwargs["path"] = Path(f"/opt/mercure/data/query/job_dirs/{full_job.id}") + for j in jobs: worker_queue.enqueue_job(j) From e12ac9c6c3e8d0ce7e49ea79f8e87ded4d8e4748 Mon Sep 17 00:00:00 2001 From: Roy Wiggins Date: Fri, 28 Jun 2024 17:57:21 +0000 Subject: [PATCH 15/42] - pause/resume jobs - redirect results to folder --- common/types.py | 7 +- webinterface/common.py | 3 + webinterface/dashboards/query.py | 200 ++++++++++++++----- webinterface/templates/dashboards/query.html | 95 ++++++++- 4 files changed, 248 insertions(+), 57 deletions(-) diff --git a/common/types.py b/common/types.py index 9558db6c..aaba7738 100755 --- a/common/types.py +++ b/common/types.py @@ -185,9 +185,14 @@ class DicomNode(BaseModel): aet_target: str aet_source: Optional[str] = "" +class DicomDestination(BaseModel): + name: str + path: str + class DicomRetrieveConfig(BaseModel): dicom_nodes: List[DicomNode] = [] - + destination_folders: List[DicomDestination] = [] + class Config(BaseModel, Compat): appliance_name: str port: int diff --git a/webinterface/common.py b/webinterface/common.py index 25a35efd..d0692a89 100755 --- a/webinterface/common.py +++ b/webinterface/common.py @@ -17,6 +17,9 @@ from rq_scheduler import Scheduler redis = Redis() + + + worker_queue = Queue(connection=redis) worker_scheduler = Scheduler(queue=worker_queue, connection=worker_queue.connection) diff --git a/webinterface/dashboards/query.py b/webinterface/dashboards/query.py index 81170039..eca892da 100644 --- a/webinterface/dashboards/query.py +++ b/webinterface/dashboards/query.py @@ -20,6 +20,8 @@ from .common import router logger = config.get_logger() + + def query_job(*,accession, node): config.read_config() c = SimpleDicomClient(node.ip, node.port, node.aet_target, config.mercure.incoming_folder) @@ -36,9 +38,10 @@ def query_job(*,accession, node): job.save_meta() return "Complete" + def dummy_job(*,accession, node, path): Path(path).mkdir(parents=True, exist_ok=True) - total_time = 3 # Total time for the job in seconds (1 minute) + total_time = 10 # Total time for the job in seconds (1 minute) update_interval = 1 # Interval between updates in seconds start_time = time.monotonic() @@ -61,7 +64,10 @@ def dummy_job(*,accession, node, path): job.save_meta() while (time.monotonic() - start_time) < total_time: time.sleep(update_interval) # Sleep for the interval duration - (Path(path) / f"dummy{completed}_{job.id}.dcm").touch() + out_file = (Path(path) / f"dummy{completed}_{job.id}.dcm") + if out_file.exists(): + raise Exception(f"{out_file} exists already") + out_file.touch() remaining -= 1 completed += 1 @@ -73,21 +79,82 @@ def dummy_job(*,accession, node, path): job.save_meta() # Save the updated meta data to the job if job_parent: + job_parent.get_meta() # there is technically a race condition here... job_parent.meta['completed'] += 1 - job_parent.meta['progress'] = f"{job_parent.meta['completed'] } / {job_parent.meta['total']}" + job_parent.meta['progress'] = f"{job_parent.meta['started'] } / {job_parent.meta['completed'] } / {job_parent.meta['total']}" job_parent.save_meta() return "Job complete" -def batch_job(*, accessions, subjobs, path): - for p in Path(path).glob("**/*.dcm"): - shutil.move(p, "/opt/mercure/data/incoming") +def batch_job(*, accessions, subjobs, path, destination): + job = get_current_job() + job.save_meta() + logger.info(f"Job completing {job.id}") + logger.info(path) + if destination is None: + for p in Path(path).glob("**/*"): + if p.is_file(): + shutil.move(p, config.mercure.incoming_folder) + else: + dest_folder: Path = Path(destination) / job.id + dest_folder.mkdir() + for p in Path(path).iterdir(): + if p.is_dir(): + logger.info(f"moving {p} to {dest_folder}") + shutil.move(p, dest_folder) + shutil.rmtree(path) - return "Batch complete" + return "Job complete" + + def monitor_job(): print("monitoring") +@router.post("/query/pause_job") +@requires(["authenticated", "admin"], redirect="login") +async def pause_job(request): + job = worker_queue.fetch_job(request.query_params['id']) + if not job: + return JSONResponse({'error': 'Job not found'}, status_code=404) + if job.is_finished or job.is_failed: + return JSONResponse({'error': 'Job is already finished'}, status_code=400) + + for job_id in job.kwargs.get('subjobs',[]): + subjob = worker_queue.fetch_job(job_id) + if subjob and (subjob.is_deferred or subjob.is_queued): + subjob.meta['paused'] = True + subjob.save_meta() + subjob.cancel() + job.meta['paused'] = True + job.save_meta() + return JSONResponse({'status': 'success'}, status_code=200) + +@router.post("/query/resume_job") +@requires(["authenticated", "admin"], redirect="login") +async def resume_job(request): + job = worker_queue.fetch_job(request.query_params['id']) + if not job: + return JSONResponse({'error': 'Job not found'}, status_code=404) + if job.is_finished or job.is_failed: + return JSONResponse({'error': 'Job is already finished'}, status_code=400) + # if not job.meta.get('paused', False): + # return JSONResponse({'error': 'Job is not paused'}, status_code=400) + + for subjob_id in job.kwargs.get('subjobs',[]): + subjob = worker_queue.fetch_job(subjob_id) + if subjob and subjob.meta.get('paused', None): + subjob.meta['paused'] = False + subjob.save_meta() + worker_queue.canceled_job_registry.requeue(subjob_id) + # worker_queue.canceled_job_registry.remove(subjob_id) + job.get_meta() + job.meta['paused'] = False + job.save_meta() + # worker_queue.canceled_job_registry.requeue(job.id) + # worker_queue.canceled_job_registry.remove(job.id) + return JSONResponse({'status': 'success'}, status_code=200) + @router.get("/query/job_info") @requires(["authenticated", "admin"], redirect="login") async def get_job_info(request): @@ -97,17 +164,21 @@ async def get_job_info(request): return JSONResponse({'error': 'Job not found'}, status_code=404) subjob_info = [] - for job_id in job.kwargs.get('subjobs',[]): - subjob = worker_queue.fetch_job(job_id) - if subjob: - subjob_info.append({'id': subjob.get_id(), - 'ended_at': subjob.ended_at.isoformat().split('.')[0] if subjob.ended_at else "", - 'created_at_dt':subjob.created_at, - 'accession': subjob.kwargs['accession'], - 'progress': subjob.meta.get('progress'), - 'status': subjob.get_status()}) + subjobs = (worker_queue.fetch_job(job) for job in job.kwargs.get('subjobs', [])) + for subjob in subjobs: + info = {'id': subjob.get_id(), + 'ended_at': subjob.ended_at.isoformat().split('.')[0] if subjob.ended_at else "", + 'created_at_dt':subjob.created_at, + 'accession': subjob.kwargs['accession'], + 'progress': subjob.meta.get('progress'), + 'paused': subjob.meta.get('paused',False), + 'status': subjob.get_status() + } + if info['status'] == 'canceled' and info['paused']: + info['status'] = 'paused' + subjob_info.append(info) subjob_info = sorted(subjob_info, key=lambda x:x['created_at_dt']) - return templates.TemplateResponse("dashboards/query_job_fragment.html", {"request":request,"subjob_info":subjob_info}) + return templates.TemplateResponse("dashboards/query_job_fragment.html", {"request":request,"job":job,"subjob_info":subjob_info}) @router.post("/query") @requires(["authenticated", "admin"], redirect="login") @@ -120,18 +191,22 @@ async def query_post_batch(request): if n.name == form.get("dicom_node"): node = n break - random_accessions = ["".join(random.choices([str(i) for i in range(10)], k=10)) for _ in range(5)] + destination = form.get("destination") + for d in config.mercure.dicom_retrieve.destination_folders: + if d.name == destination: + destination_path = d.path + random_accessions = ["".join(random.choices([str(i) for i in range(10)], k=5)) for _ in range(3)] jobs = [] with Connection(redis): for accession in random_accessions: - job = Job.create(dummy_job, kwargs=dict(accession=accession, node=node), timeout='30m', result_ttl=-1, meta=dict(type="get_accession_batch",parent=None)) + job = Job.create(dummy_job, kwargs=dict(accession=accession, node=node), timeout='30m', result_ttl=-1, meta=dict(type="get_accession_batch",parent=None, paused=False)) jobs.append(job) - full_job = Job.create(batch_job, kwargs=dict(accessions=random_accessions, subjobs=[j.id for j in jobs]), timeout=-1, result_ttl=-1, meta=dict(type="batch", started=0, completed=0, total=len(jobs)), depends_on=[j.id for j in jobs]) + full_job = Job.create(batch_job, kwargs=dict(accessions=random_accessions, subjobs=[j.id for j in jobs], destination=destination_path), timeout=-1, result_ttl=-1, meta=dict(type="batch", started=0, paused=False,completed=0, total=len(jobs)), depends_on=[j.id for j in jobs]) for j in jobs: j.meta["parent"] = full_job.id j.kwargs["path"] = f"/opt/mercure/data/query/job_dirs/{full_job.id}/{j.kwargs['accession']}" full_job.kwargs["path"] = Path(f"/opt/mercure/data/query/job_dirs/{full_job.id}") - + full_job.kwargs["path"].mkdir(parents=True) for j in jobs: worker_queue.enqueue_job(j) @@ -164,38 +239,59 @@ async def query_jobs(request): worker_queue.finished_job_registry, # Returns FinishedJobRegistry worker_queue.failed_job_registry, # Returns FailedJobRegistry worker_queue.scheduled_job_registry, # Returns ScheduledJobRegistry + worker_queue.canceled_job_registry, # Returns CanceledJobRegistry ] job_info = [] - for r in registries: - for j_id in r.get_job_ids(): - job = worker_queue.fetch_job(j_id) - if job.meta.get('type') != 'batch': - continue - job_dict = dict(id=j_id, - status=job.get_status(), - parameters=dict(accession=job.kwargs.get('accession','')), - created_at=1000*datetime.timestamp(job.created_at) if job.created_at else "", - enqueued_at=1000*datetime.timestamp(job.enqueued_at) if job.enqueued_at else "", - result=job.result, - meta=job.meta, - progress="") - # if job.meta.get('completed') and job.meta.get('remaining'): - # job_dict["progress"] = f"{job.meta.get('completed')} / {job.meta.get('completed') + job.meta.get('remaining')}" - # if job.meta.get('type',None) == "batch": - n_started = job.meta.get('started',0) - n_completed = job.meta.get('completed',0) - n_total = job.meta.get('total',0) - - if job_dict["status"] == "finished": - job_dict["progress"] = f"{n_total} / {n_total}" - elif job_dict["status"] in ("deferred","started"): - job_dict["progress"] = f"{n_completed} / {n_total}" - if 0 < n_started < n_total: - job_dict["status"] = "running" - elif n_completed == n_total: - job_dict["status"] = "finishing" - - job_info.append(job_dict) + # logger.info(worker_queue.job_ids) + # for registry in registries: + job_ids = set() + for registry in registries: + for j_id in registry.get_job_ids(): + job_ids.add(j_id) + for j_id in worker_queue.job_ids: + job_ids.add(j_id) + + for j_id in job_ids: + job = worker_queue.fetch_job(j_id) + job_meta = job.get_meta() + if job_meta.get('type') != 'batch': + continue + job_dict = dict(id=j_id, + status=job.get_status(), + parameters=dict(accession=job.kwargs.get('accession','')), + created_at=1000*datetime.timestamp(job.created_at) if job.created_at else "", + enqueued_at=1000*datetime.timestamp(job.enqueued_at) if job.enqueued_at else "", + result=job.result, + meta=job_meta, + progress="") + # if job.meta.get('completed') and job.meta.get('remaining'): + # job_dict["progress"] = f"{job.meta.get('completed')} / {job.meta.get('completed') + job.meta.get('remaining')}" + # if job.meta.get('type',None) == "batch": + n_started = job_meta.get('started',0) + n_completed = job_meta.get('completed',0) + n_total = job_meta.get('total',0) + + if job_dict["status"] == "finished": + job_dict["progress"] = f"{n_total} / {n_total}" + elif job_dict["status"] in ("deferred","started", "paused", "canceled"): + job_dict["progress"] = f"{n_completed} / {n_total}" + + # if job_dict["status"] == "canceled" and + if job_dict["meta"].get('paused', False): + if n_started < n_completed: # TODO: this does not work + job_dict["status"] = "pausing" + else: + job_dict["status"] = "paused" + + if job_dict["status"] in ("deferred", "started"): + if n_started == 0: + job_dict["status"] = "waiting" + elif n_completed < n_total: + job_dict["status"] = "running" + elif n_completed == n_total: + job_dict["status"] = "finishing" + + job_info.append(job_dict) return JSONResponse(dict(data=job_info)) # return PlainTextResponse(",".join([str(j) for j in all_jobs])) @@ -205,7 +301,7 @@ async def query(request): template = "dashboards/query.html" context = { "request": request, - + "destination_folders": config.mercure.dicom_retrieve.destination_folders, "dicom_nodes": config.mercure.dicom_retrieve.dicom_nodes, "page": "query", } diff --git a/webinterface/templates/dashboards/query.html b/webinterface/templates/dashboards/query.html index f595df46..085cc209 100644 --- a/webinterface/templates/dashboards/query.html +++ b/webinterface/templates/dashboards/query.html @@ -44,7 +44,26 @@

DICOM Query

- +
+
+ +
+
+
+
+
+ +
+
+
+
+
@@ -59,7 +78,6 @@

DICOM Query

-
@@ -84,12 +102,45 @@

DICOM Query

+ + + {% endblock %} {% block dashboard_content %} @@ -16,7 +17,7 @@

DICOM Query

Redis connection not available, so this dashboard will not function. {% endif %} -
+
@@ -89,7 +90,7 @@

DICOM Query