diff --git a/build-docker.sh b/build-docker.sh
index 75a7d189..4bae6070 100755
--- a/build-docker.sh
+++ b/build-docker.sh
@@ -84,7 +84,7 @@ build_component () {
docker build $CACHE -t $PREFIX/mercure-base:$TAG -t $PREFIX/mercure-base:latest -f docker/base/Dockerfile .
-for component in ui bookkeeper receiver router processor dispatcher cleaner
+for component in ui bookkeeper receiver router processor dispatcher cleaner worker
do
build_component $component
done
diff --git a/common/config.py b/common/config.py
index f2619b9d..cc2ecd39 100755
--- a/common/config.py
+++ b/common/config.py
@@ -40,6 +40,7 @@
"error_folder": "/opt/mercure/data/error",
"discard_folder": "/opt/mercure/data/discard",
"processing_folder": "/opt/mercure/data/processing",
+ "jobs_folder": "/opt/mercure/data/jobs",
"router_scan_interval": 1, # in seconds
"dispatcher_scan_interval": 1, # in seconds
"cleaner_scan_interval": 60, # in seconds
diff --git a/common/helper.py b/common/helper.py
index 62baaf75..09a4a9c0 100755
--- a/common/helper.py
+++ b/common/helper.py
@@ -5,15 +5,13 @@
"""
# Standard python includes
import asyncio
-from contextlib import suppress
from datetime import datetime
from datetime import time as _time
import inspect
from pathlib import Path
import threading
-from typing import Callable, Optional
+from typing import Callable, Optional, Tuple
import graphyte
-import aiohttp
import os
import common.influxdb
@@ -23,6 +21,16 @@
loop = asyncio.get_event_loop()
+def validate_folders(config) -> Tuple[bool, str]:
+ for folder in ( config.incoming_folder, config.studies_folder, config.outgoing_folder,
+ config.success_folder, config.error_folder, config.discard_folder,
+ config.processing_folder, config.jobs_folder ):
+ if not Path(folder).is_dir():
+ return False, f"Folder {folder} does not exist."
+ if not os.access( folder, os.R_OK | os.W_OK ):
+ return False, f"No read/write access to {folder}"
+ return True, ""
+
def get_now_str() -> str:
"""Returns the current time as string with mercure-wide formatting"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@@ -116,6 +124,8 @@ async def _run(self) -> None:
def run_until_complete(self, loop=None) -> None:
self.start()
+ if not self._task:
+ raise Exception("Unexpected error: AsyncTimer._task is None")
loop = loop or asyncio.get_event_loop()
loop.run_until_complete(self._task)
diff --git a/common/rule_evaluation.py b/common/rule_evaluation.py
index 16c42768..71b36958 100755
--- a/common/rule_evaluation.py
+++ b/common/rule_evaluation.py
@@ -48,13 +48,13 @@ def replace_tags(rule: str, tags: Dict[str, str]) -> Any:
safe_eval_cmds = {"float": float, "int": int, "str": str, "len": len, "bool": bool, "sum": sum, "round": round, "max": max, "min": min, "abs": abs, "pow": pow, "chr": chr, "ord": ord}
-def eval_rule(rule: str, tags: Dict[str, str]) -> Any:
+def eval_rule(rule: str, tags_dict: Dict[str, str]) -> Any:
"""Parses the given rule, replaces all tag variables with values from the given tags dictionary, and
evaluates the rule. If the rule is invalid, an exception will be raised."""
logger.info(f"Rule: {rule}")
- rule = replace_tags(rule, tags)
+ rule = replace_tags(rule, tags_dict)
logger.info(f"Evaluated: {rule}")
- tags_obj = Tags(tags)
+ tags_obj = Tags(tags_dict)
try:
result = eval(rule, {"__builtins__": {}}, {**safe_eval_cmds,"tags":tags_obj})
except SyntaxError as e:
@@ -63,7 +63,7 @@ def eval_rule(rule: str, tags: Dict[str, str]) -> Any:
if opening >-1 and closing>1:
raise TagNotFoundException(f"No such tag '{rule[opening+1:closing]}' in tags list.")
raise
- logger.info(", ".join([f"{tag} = \"{tags[tag]}\"" for tag in tags_obj.tags_accessed()]))
+ logger.info(", ".join([f"{tag} = \"{tags_dict[tag]}\"" for tag in tags_obj.tags_accessed()]))
logger.info(f"Result: {result}")
return result, tags_obj.tags_accessed()
diff --git a/common/types.py b/common/types.py
index 33b400c2..4051a938 100755
--- a/common/types.py
+++ b/common/types.py
@@ -21,10 +21,11 @@ def get(self, item, els=None) -> Any:
class EmptyDict(TypedDict):
pass
-
class Target(BaseModel, Compat):
+ target_type: Any
contact: Optional[str] = ""
comment: str = ""
+ direction: Optional[Literal["pull", "push", "both"]] = "push"
@property
def short_description(self) -> str:
@@ -120,12 +121,16 @@ def short_description(self) -> str:
class DicomWebTarget(Target):
target_type: Literal["dicomweb"] = "dicomweb"
url: str
- qido_url_prefix: Optional[str]
- wado_url_prefix: Optional[str]
- stow_url_prefix: Optional[str]
- access_token: Optional[str]
- http_user: Optional[str]
- http_password: Optional[str]
+ qido_url_prefix: Optional[str] = None
+ wado_url_prefix: Optional[str] = None
+ stow_url_prefix: Optional[str] = None
+ access_token: Optional[str] = None
+ http_user: Optional[str] = None
+ http_password: Optional[str] = None
+
+ @property
+ def short_description(self) -> str:
+ return self.url
@property
def short_description(self) -> str:
@@ -210,8 +215,41 @@ class ProcessingLogsConfig(BaseModel):
class DicomReceiverConfig(BaseModel):
additional_tags: Dict[str,str] = {}
-
+
+class DicomNodeBase(BaseModel):
+ name: str
+
+ @classmethod
+ def __get_validators__(cls):
+ # one or more validators may be yielded which will be called in the
+ # order to validate the input, each validator will receive as an input
+ # the value returned from the previous validator
+ yield cls.validate
+
+ @classmethod
+ def validate(cls, v):
+ """Parse the target as any of the known target types."""
+ subclass_dict: typing.Dict[str, Type[DicomNodeBase]] = {sbc.__name__: sbc for sbc in cls.__subclasses__()}
+ for k in subclass_dict:
+ try:
+ return subclass_dict[k](**v)
+ except:
+ pass
+ raise ValueError("Couldn't validate dicom node as any of", list(subclass_dict.keys()))
+
+ @classmethod
+ def get_name(cls) -> str:
+ return cls.construct().node_type # type: ignore
+
+class DicomDestination(BaseModel):
+ name: str
+ path: str
+
+class DicomRetrieveConfig(BaseModel):
+ dicom_nodes: List[DicomNodeBase] = []
+ destination_folders: List[DicomDestination] = []
+
class Config(BaseModel, Compat):
appliance_name: str
appliance_color: str = "#FFF"
@@ -224,6 +262,7 @@ class Config(BaseModel, Compat):
error_folder: str
discard_folder: str
processing_folder: str
+ jobs_folder: str
router_scan_interval: int # in seconds
dispatcher_scan_interval: int # in seconds
cleaner_scan_interval: int # in seconds
@@ -258,6 +297,7 @@ class Config(BaseModel, Compat):
phi_notifications: Optional[bool] = False
server_time: str = "UTC"
local_time: str = "UTC"
+ dicom_retrieve: DicomRetrieveConfig = DicomRetrieveConfig()
class TaskInfo(BaseModel, Compat):
diff --git a/configuration/default_mercure.json b/configuration/default_mercure.json
index f2a608da..d8f962fb 100755
--- a/configuration/default_mercure.json
+++ b/configuration/default_mercure.json
@@ -11,6 +11,7 @@
"error_folder" : "/opt/mercure/data/error",
"discard_folder" : "/opt/mercure/data/discard",
"processing_folder" : "/opt/mercure/data/processing",
+ "jobs_folder" : "/opt/mercure/data/jobs",
"bookkeeper" : "0.0.0.0:8080",
"bookkeeper_api_key" : "BOOKKEEPER_TOKEN_PLACEHOLDER",
"graphite_ip" : "",
diff --git a/configuration/default_services.json b/configuration/default_services.json
index 8e6458f6..c6390c80 100755
--- a/configuration/default_services.json
+++ b/configuration/default_services.json
@@ -28,5 +28,20 @@
"name": "Bookkeeper",
"systemd_service": "mercure_bookkeeper.service",
"docker_service": "mercure_bookkeeper_1"
+ },
+ "workers": {
+ "name": "Workers",
+ "systemd_service": ["mercure_worker_fast@1.service", "mercure_worker_fast@2.service", "mercure_worker_slow@1.service", "mercure_worker_slow@2.service"],
+ "docker_service": ["mercure_worker_fast_1", "mercure_worker_fast_2", "mercure_worker_slow_1", "mercure_worker_slow_2"]
+ },
+ "redis": {
+ "name": "Redis",
+ "systemd_service": "mercure_redis.service",
+ "docker_service": "mercure_redis_1"
+ },
+ "ui": {
+ "name": "UI",
+ "systemd_service": "mercure_ui.service",
+ "docker_service": "mercure_ui_1"
}
}
diff --git a/dispatch/target_types/base.py b/dispatch/target_types/base.py
index 51c2f070..39f7ad73 100644
--- a/dispatch/target_types/base.py
+++ b/dispatch/target_types/base.py
@@ -1,8 +1,12 @@
+from dataclasses import dataclass
+
+from pydicom import Dataset
from common.types import Task, TaskDispatch, TaskInfo, Rule, Target
import common.config as config
from subprocess import CalledProcessError, check_output
from starlette.responses import JSONResponse
-from typing import Any, TypeVar, Generic, cast
+from typing import Any, Dict, Generator, List, TypeVar, Generic, cast
+from pydicom.datadict import dictionary_VR, keyword_for_tag, tag_for_keyword
from pathlib import Path
import subprocess
@@ -12,9 +16,15 @@
TargetTypeVar = TypeVar("TargetTypeVar")
+@dataclass
+class ProgressInfo():
+ completed: int = 0
+ remaining: int = 0
+ progress: str = ""
+
class TargetHandler(Generic[TargetTypeVar]):
test_template = "targets/base-test.html"
-
+ can_pull = False
def __init__(self):
pass
@@ -32,6 +42,18 @@ def send_to_target(
) -> str:
return ""
+ class NoSuchTagException(Exception):
+ pass
+ def get_from_target(self, target: TargetTypeVar, accession: str, search_filters: Dict[str,List[str]], path:str) -> Generator[ProgressInfo, None, None]:
+ raise Exception()
+
+ def find_from_target(self, target: TargetTypeVar, accession: str, search_filters:Dict[str,List[str]]) -> List[Dataset]:
+ if self is TargetHandler:
+ raise Exception("This method should be overridden by a subclass")
+ for t in search_filters.keys():
+ if not tag_for_keyword(t):
+ raise TargetHandler.NoSuchTagException(f"Invalid search filter: no such tag '{t}'")
+ return []
def handle_error(self, e, command) -> None:
pass
@@ -73,6 +95,7 @@ def send_to_target(
raise
return result
+
def handle_error(self, e: CalledProcessError, command) -> None:
logger.error(e.output)
logger.error(f"Failed. Command exited with value {e.returncode}: \n {command}")
diff --git a/dispatch/target_types/builtin.py b/dispatch/target_types/builtin.py
index 398aec70..199120da 100644
--- a/dispatch/target_types/builtin.py
+++ b/dispatch/target_types/builtin.py
@@ -1,3 +1,6 @@
+from typing import Any, Dict, Generator, List, Optional
+
+from pydicom import Dataset
from common.types import DicomTarget, DicomTLSTarget, SftpTarget, DummyTarget, Task
import common.config as config
from common.constants import mercure_names
@@ -10,8 +13,10 @@
from starlette.responses import JSONResponse
+from webinterface.dicom_client import DicomClientCouldNotFind, SimpleDicomClient
+
from .registry import handler_for
-from .base import SubprocessTargetHandler, TargetHandler
+from .base import ProgressInfo, SubprocessTargetHandler, TargetHandler
DCMSEND_ERROR_CODES = {
1: "EXITCODE_COMMANDLINE_SYNTAX_ERROR",
@@ -34,6 +39,7 @@ class DicomTargetHandler(SubprocessTargetHandler[DicomTarget]):
test_template = "targets/dicom-test.html"
icon = "fa-database"
display_name = "DICOM"
+ can_pull = True
def _create_command(self, target: DicomTarget, source_folder: Path, task: Task):
target_ip = target.ip
@@ -54,6 +60,21 @@ def _create_command(self, target: DicomTarget, source_folder: Path, task: Task):
)
return command, {}
+ def find_from_target(self, target: DicomTarget, accession: str, search_filters:Dict[str,List[str]]) -> List[Dataset]:
+ c = SimpleDicomClient(target.ip, target.port, target.aet_target, None)
+ try:
+ return c.findscu(accession, search_filters)
+ except DicomClientCouldNotFind as e:
+ return []
+
+ def get_from_target(self, target: DicomTarget, accession:str, search_filters:Dict[str,List[str]], path) -> Generator[ProgressInfo, None, None]:
+ config.read_config()
+ c = SimpleDicomClient(target.ip, target.port, target.aet_target, path)
+ for identifier in c.getscu(accession, search_filters):
+ completed, remaining = identifier.NumberOfCompletedSuboperations, identifier.NumberOfRemainingSuboperations,
+ progress = f"{ completed } / { completed + remaining }"
+ yield ProgressInfo(completed, remaining, progress)
+
def handle_error(self, e, command):
dcmsend_error_message = DCMSEND_ERROR_CODES.get(e.returncode, None)
logger.exception(f"Failed command:\n {command} \nbecause of {dcmsend_error_message}")
diff --git a/dispatch/target_types/dicomweb.py b/dispatch/target_types/dicomweb.py
index 2053e7e9..0f85a1ee 100644
--- a/dispatch/target_types/dicomweb.py
+++ b/dispatch/target_types/dicomweb.py
@@ -1,9 +1,13 @@
+import os
from pathlib import Path
+import sqlite3
+from typing import Any, Dict, Generator, List, Union
+from dicomweb_client import DICOMfileClient
from requests.exceptions import HTTPError
-
+import time
import pydicom
from common.types import DicomWebTarget, TaskDispatch, Task
-from .base import TargetHandler
+from .base import ProgressInfo, TargetHandler
from .registry import handler_for
from dicomweb_client.api import DICOMwebClient
@@ -22,10 +26,20 @@ class DicomWebTargetHandler(TargetHandler[DicomWebTarget]):
# test_template = "targets/dicomweb-test.html"
icon = "fa-share-alt"
display_name = "DICOMweb"
-
- def create_client(self, target: DicomWebTarget):
+ can_pull = True
+
+ def create_client(self, target: DicomWebTarget) -> Union[DICOMfileClient, DICOMwebClient]:
session = None
headers = None
+ if target.url.startswith("file://"):
+ try:
+ return DICOMfileClient(url=target.url, in_memory=False, update_db=True)
+ except sqlite3.OperationalError as e:
+ # if sqlite3.OperationalError, try in-memory database
+ # Todo: store the db elsewhere if we don't have write access to this folder
+ # This also makes it possible to run tests under pyfakefs since it can't patch sqlite3
+ return DICOMfileClient(url=target.url, in_memory=True, update_db=True)
+
if target.http_user and target.http_password:
session = create_session_from_user_pass(username=target.http_user, password=target.http_password)
elif target.access_token:
@@ -39,8 +53,58 @@ def create_client(self, target: DicomWebTarget):
session=session,
headers=headers,
)
+ client.set_http_retry_params(retry=False, max_attempts=2, wait_exponential_multiplier=100)
+ logger.info(client)
return client
+ def find_from_target(self, target: DicomWebTarget, accession: str, search_filters: Dict[str,List[str]]={}) -> List[pydicom.Dataset]:
+ super().find_from_target(target, accession, search_filters)
+ client = self.create_client(target)
+ use_filters = {'AccessionNumber': accession}
+
+ # If there more one value per filter, just get metadata for the entire accession and filter it after.
+ # Some DICOM servers do actually support filtering on lists, but DICOMwebClient does not seem to support this.
+ # See: https://dicom.nema.org/medical/dicom/current/output/html/part18.html#sect_8.3.4.6
+ for filter_values in search_filters.values():
+ if len(filter_values) > 1:
+ break
+ else:
+ use_filters.update({k: v[0] for k,v in search_filters.items()})
+
+ metadata = client.search_for_series(search_filters=use_filters, get_remaining=True, fields=['StudyInstanceUID', 'SeriesInstanceUID', 'NumberOfSeriesRelatedInstances', 'StudyDescription', 'SeriesDescription'] + list(search_filters.keys()))
+ meta_datasets = [pydicom.Dataset.from_json(ds) for ds in metadata]
+ result = []
+
+ # In case the server didn't filter as strictly as we expected it to, filter again
+ for d in meta_datasets:
+ for filter in search_filters:
+ if d.get(filter) not in search_filters[filter]:
+ break
+ else:
+ result.append(d)
+ logger.debug(result)
+ return result
+
+ def get_from_target(self, target: DicomWebTarget, accession, search_filters, path) -> Generator[ProgressInfo, None, None]:
+ series = self.find_from_target(target, accession, search_filters=search_filters)
+ if not series:
+ raise ValueError("No series found with accession number {}".format(accession))
+ n = 0
+ remaining = sum([int(x.NumberOfSeriesRelatedInstances) for x in series])
+ client = self.create_client(target)
+ for s in series:
+ instances = client.retrieve_series(s.StudyInstanceUID, s.SeriesInstanceUID)
+ # remaining += len(instances)
+ for instance in instances:
+ sop_instance_uid = instance.get('SOPInstanceUID')
+ filename = f"{path}/{sop_instance_uid}.dcm"
+ instance.save_as(filename)
+ n += 1
+ remaining -= 1
+ time.sleep(0.5)
+ yield ProgressInfo(n, remaining, f'{n} / {n + remaining}')
+ time.sleep(0.5)
+
def send_to_target(
self, task_id: str, target: DicomWebTarget, dispatch_info: TaskDispatch, source_folder: Path, task: Task
) -> str:
@@ -71,20 +135,31 @@ def from_form(self, form: dict, factory, current_target) -> DicomWebTarget:
async def test_connection(self, target: DicomWebTarget, target_name: str):
client = self.create_client(target)
- results = {}
- try:
- result = client._http_get(target.url)
- results["authentication"] = True
- except HTTPError as e:
- if e.errno == 401:
- results["authentication"] = False
+ results: Dict[str, Union[bool, str, None]] = {}
+ results["Authentication"] = None
+ if isinstance(client, DICOMwebClient):
+ try:
+ result = client._http_get(target.url)
+ results["Authentication"] = True
+ except HTTPError as e:
+ if e.errno == 401:
+ results["Authentication"] = False
+ else:
+ results["Authentication"] = True
+ elif isinstance(client, DICOMfileClient):
+ folder = Path(target.url[7:])
+ if not folder.exists() or not folder.is_dir():
+ results["Authentication"] = f"No such folder {folder}"
+ elif target.direction in ("pull", "both") and not os.access(folder, os.R_OK):
+ results["Authentication"] = f"No read access to folder {folder}"
+ elif target.direction in ("push", "both") and not os.access(folder, os.W_OK):
+ results["Authentication"] = f"No write access to folder {folder}"
else:
- results["authentication"] = True
-
+ results["Authentication"] = True
try:
client.search_for_studies(limit=1)
- results["QIDO_query"] = True
+ results["QIDO query"] = True
except HTTPError as e:
- results["QIDO_query"] = False
+ results["QIDO query"] = False
return results
diff --git a/docker/base/Dockerfile b/docker/base/Dockerfile
index d3882eef..5ca6ec27 100755
--- a/docker/base/Dockerfile
+++ b/docker/base/Dockerfile
@@ -31,7 +31,8 @@ RUN chmod -R o+rx /opt/mercure/app && \
/opt/mercure/data/success \
/opt/mercure/data/error \
/opt/mercure/data/discard \
- /opt/mercure/data/processing && \
+ /opt/mercure/data/processing \
+ /opt/mercure/data/jobs && \
chown -R mercure /opt/mercure/data
# Export the configuration and data folder as a volume, as multiple scripts will have to read/write there
VOLUME /opt/mercure/config
diff --git a/docker/docker-compose.override.yml b/docker/docker-compose.override.yml
index 185637a2..50298139 100755
--- a/docker/docker-compose.override.yml
+++ b/docker/docker-compose.override.yml
@@ -4,6 +4,7 @@ x-env: &env
MERCURE_RUNNER: docker
MERCURE_ENV: DEV
MERCURE_CONFIG_FOLDER: /opt/mercure/config
+ REDIS_URL: http://redis:6379/0
x-volumes: &volumes
volumes:
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 1e6163cf..a5ee2fce 100755
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -2,7 +2,7 @@ version: "3.9"
x-env: &env
environment:
MERCURE_RUNNER: docker
-
+ REDIS_URL: redis://redis:6379/0
x-volumes: &volumes
volumes:
- config:/opt/mercure/config
@@ -24,6 +24,13 @@ services:
depends_on:
db:
condition: service_healthy
+ redis:
+ condition: service_started
+ worker_fast:
+ condition: service_started
+ worker_slow:
+ condition: service_started
+
image: mercureimaging/mercure-ui${IMAGE_TAG}
restart: always
ports:
@@ -107,6 +114,37 @@ services:
condition: service_healthy
<<: *volumes
<<: *env
+
+ redis:
+ <<: *user
+ image: redis
+ restart: always
+
+ worker_fast:
+ <<: *user
+ image: mercureimaging/mercure-worker${IMAGE_TAG}
+ restart: always
+ depends_on:
+ - redis
+ deploy:
+ replicas: 2
+ environment:
+ WORKER_QUEUE: mercure_fast
+ REDIS_URL: redis://redis:6379/0
+ <<: *volumes
+
+ worker_slow:
+ <<: *user
+ image: mercureimaging/mercure-worker${IMAGE_TAG}
+ restart: always
+ depends_on:
+ - redis
+ deploy:
+ replicas: 2
+ environment:
+ WORKER_QUEUE: mercure_slow
+ REDIS_URL: redis://redis:6379/0
+ <<: *volumes
volumes:
db-data:
diff --git a/docker/worker/Dockerfile b/docker/worker/Dockerfile
new file mode 100644
index 00000000..7961aae7
--- /dev/null
+++ b/docker/worker/Dockerfile
@@ -0,0 +1,4 @@
+ARG VERSION_TAG=latest
+ARG IMAGE_NAME=mercureimaging/mercure-base
+FROM $IMAGE_NAME:$VERSION_TAG
+CMD /opt/mercure/env/bin/rq worker $WORKER_QUEUE --url $REDIS_URL --with-scheduler
diff --git a/install.sh b/install.sh
index b75dda8f..eda5901d 100755
--- a/install.sh
+++ b/install.sh
@@ -97,7 +97,7 @@ create_folders () {
echo "## Creating $DATA_PATH..."
sudo mkdir "$DATA_PATH"
sudo mkdir "$DATA_PATH"/incoming "$DATA_PATH"/studies "$DATA_PATH"/outgoing "$DATA_PATH"/success
- sudo mkdir "$DATA_PATH"/error "$DATA_PATH"/discard "$DATA_PATH"/processing
+ sudo mkdir "$DATA_PATH"/error "$DATA_PATH"/discard "$DATA_PATH"/processing "$DATA_PATH"/jobs
sudo chown -R $OWNER:$OWNER $DATA_PATH
sudo chmod a+x $DATA_PATH
else
@@ -349,7 +349,7 @@ install_app_files() {
install_packages() {
echo "## Installing Linux packages..."
sudo apt-get update
- sudo apt-get install -y build-essential wget git dcmtk jq inetutils-ping sshpass rsync postgresql postgresql-contrib libpq-dev git-lfs python3-wheel python3-dev python3 python3-venv sendmail libqt5core5a
+ sudo apt-get install -y build-essential wget git dcmtk jq inetutils-ping sshpass rsync postgresql postgresql-contrib libpq-dev git-lfs python3-wheel python3-dev python3 python3-venv sendmail libqt5core5a redis
}
install_dependencies() {
@@ -380,6 +380,9 @@ install_services() {
sudo cp "$MERCURE_SRC"/installation/*.service /etc/systemd/system
sudo systemctl enable mercure_bookkeeper.service mercure_cleaner.service mercure_dispatcher.service mercure_receiver.service mercure_router.service mercure_ui.service mercure_processor.service
sudo systemctl start mercure_bookkeeper.service mercure_cleaner.service mercure_dispatcher.service mercure_receiver.service mercure_router.service mercure_ui.service mercure_processor.service
+
+ sudo systemctl enable mercure_worker_fast@1.service mercure_worker_fast@2.service mercure_worker_slow@1.service mercure_worker_slow@2.service
+ sudo systemctl start mercure_worker_fast@1.service mercure_worker_fast@2.service mercure_worker_slow@1.service mercure_worker_slow@2.service
}
systemd_install () {
diff --git a/installation/mercure-sudoer b/installation/mercure-sudoer
index 7cae2299..63a291c9 100644
--- a/installation/mercure-sudoer
+++ b/installation/mercure-sudoer
@@ -5,3 +5,8 @@ mercure ALL=(ALL) NOPASSWD: /bin/journalctl -n 1000 -u mercure_receiver.service,
mercure ALL=(ALL) NOPASSWD: /bin/journalctl -n 1000 -u mercure_router.service, /bin/journalctl -n 1000 -u mercure_router.service *,/bin/systemctl start mercure_router.service, /bin/systemctl stop mercure_router.service, /bin/systemctl restart mercure_router.service, /bin/systemctl kill mercure_router
mercure ALL=(ALL) NOPASSWD: /bin/journalctl -n 1000 -u mercure_ui.service, /bin/journalctl -n 1000 -u mercure_ui.service *,/bin/systemctl start mercure_ui.service, /bin/systemctl stop mercure_ui.service, /bin/systemctl restart mercure_ui.service, /bin/systemctl kill mercure_ui
mercure ALL=(ALL) NOPASSWD: /bin/journalctl -n 1000 -u mercure_processor.service, /bin/journalctl -n 1000 -u mercure_processor.service *,/bin/systemctl start mercure_processor.service, /bin/systemctl stop mercure_processor.service, /bin/systemctl restart mercure_processor.service, /bin/systemctl kill mercure_processor
+
+mercure ALL=(ALL) NOPASSWD: /bin/journalctl -n 1000 -u mercure_worker_slow@?.service, /bin/journalctl -n 1000 -u mercure_worker_slow@?.service *,/bin/systemctl start mercure_worker_slow@?.service, /bin/systemctl stop mercure_worker_slow@?.service, /bin/systemctl restart mercure_worker_slow@?.service, /bin/systemctl kill mercure_worker_slow@?.service
+mercure ALL=(ALL) NOPASSWD: /bin/journalctl -n 1000 -u mercure_worker_slow@??.service, /bin/journalctl -n 1000 -u mercure_worker_slow@??.service *,/bin/systemctl start mercure_worker_slow@??.service, /bin/systemctl stop mercure_worker_slow@??.service, /bin/systemctl restart mercure_worker_slow@??.service, /bin/systemctl kill mercure_worker_slow@??.service
+mercure ALL=(ALL) NOPASSWD: /bin/journalctl -n 1000 -u mercure_worker_fast@?.service, /bin/journalctl -n 1000 -u mercure_worker_fast@?.service *,/bin/systemctl start mercure_worker_fast@?.service, /bin/systemctl stop mercure_worker_fast@?.service, /bin/systemctl restart mercure_worker_fast@?.service, /bin/systemctl kill mercure_worker_fast@?.service
+mercure ALL=(ALL) NOPASSWD: /bin/journalctl -n 1000 -u mercure_worker_fast@??.service, /bin/journalctl -n 1000 -u mercure_worker_fast@??.service *,/bin/systemctl start mercure_worker_fast@??.service, /bin/systemctl stop mercure_worker_fast@??.service, /bin/systemctl restart mercure_worker_fast@??.service, /bin/systemctl kill mercure_worker_fast@??.service
diff --git a/installation/mercure_worker_fast@.service b/installation/mercure_worker_fast@.service
new file mode 100644
index 00000000..b2153587
--- /dev/null
+++ b/installation/mercure_worker_fast@.service
@@ -0,0 +1,17 @@
+[Unit]
+Description=Mercure RQ Worker (Fast) %i
+Documentation=https://mercure-imaging.org/docs
+After=network.target mercure_ui.service
+
+[Service]
+Type=simple
+WorkingDirectory=/opt/mercure/app
+ExecStart=/opt/mercure/env/bin/rq worker mercure_fast
+ExecReload=/bin/kill -s HUP $MAINPID
+ExecStop=/bin/kill -s TERM $MAINPID
+User=mercure
+PrivateTmp=true
+Restart=always
+
+[Install]
+WantedBy=multi-user.target
\ No newline at end of file
diff --git a/installation/mercure_worker_slow@.service b/installation/mercure_worker_slow@.service
new file mode 100644
index 00000000..68ef360d
--- /dev/null
+++ b/installation/mercure_worker_slow@.service
@@ -0,0 +1,17 @@
+[Unit]
+Description=Mercure RQ Worker (Slow) %i
+Documentation=https://mercure-imaging.org/docs
+After=network.target mercure_ui.service
+
+[Service]
+Type=simple
+WorkingDirectory=/opt/mercure/app
+ExecStart=/opt/mercure/env/bin/rq worker mercure_slow
+ExecReload=/bin/kill -s HUP $MAINPID
+ExecStop=/bin/kill -s TERM $MAINPID
+User=mercure
+PrivateTmp=true
+Restart=always
+
+[Install]
+WantedBy=multi-user.target
\ No newline at end of file
diff --git a/requirements.in b/requirements.in
index 35330f6a..8bb97656 100644
--- a/requirements.in
+++ b/requirements.in
@@ -31,9 +31,9 @@ aiofiles
jinja2
types-Jinja2
# 0.15.0 breaks session management; the next commit after 0.18 will include the fix
-starlette ~= 0.28;
+starlette ~= 0.38.5
starlette-auth-toolkit
-uvicorn == 0.16.0
+uvicorn ~= 0.30.6
itsdangerous
python-multipart
distro
@@ -52,6 +52,7 @@ python-nomad
# other
pydicom ~= 2.4.1
+pynetdicom ~= 2.0.2
pydantic >= 1.10.9, <2.0.0
pillow >= 10.0.1
dicomweb-client
@@ -61,6 +62,8 @@ certifi>=2024.07.04
future>=0.18.3
lxml>=4.9.1
pyxnat
-urllib3>=2.2.2
+urllib3 >= 2.2.2
aiosqlite~=0.20.0
+rq ~= 1.16.2
+rq-scheduler ~= 0.13.1
httpx
diff --git a/requirements.txt b/requirements.txt
index 8fd7ab9f..f2f70d06 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -31,6 +31,7 @@ async-timeout==4.0.3
# aiohttp
# aiopg
# asyncpg
+ # redis
asyncpg==0.29.0
# via -r requirements.in
attrs==23.2.0
@@ -53,7 +54,11 @@ certifi==2024.7.4
charset-normalizer==3.3.2
# via requests
click==8.1.7
- # via uvicorn
+ # via
+ # rq
+ # uvicorn
+crontab==1.0.1
+ # via rq-scheduler
daiquiri==3.2.5.1
# via -r requirements.in
databases[aiopg]==0.8.0
@@ -72,8 +77,10 @@ exceptiongroup==1.2.1
# via
# anyio
# pytest
-freezegun==1.5.1
- # via -r requirements.in
+freezegun==1.4.0
+ # via
+ # -r requirements.in
+ # rq-scheduler
frozenlist==1.4.1
# via
# aiohttp
@@ -162,10 +169,13 @@ pydicom==2.4.4
# via
# -r requirements.in
# dicomweb-client
+ # pynetdicom
pyfakefs==4.5.6
# via -r requirements.in
pygments==2.18.0
# via sphinx
+pynetdicom==2.0.2
+ # via -r requirements.in
pytest==8.2.2
# via
# -r requirements.in
@@ -180,6 +190,7 @@ python-dateutil==2.9.0.post0
# botocore
# freezegun
# influxdb-client
+ # rq-scheduler
python-json-logger==2.0.7
# via daiquiri
python-multipart==0.0.9
@@ -190,7 +201,9 @@ pyxnat==1.6.2
# via -r requirements.in
reactivex==4.0.4
# via influxdb-client
-requests==2.32.3
+redis==5.0.4
+ # via rq
+requests==2.31.0
# via
# dicomweb-client
# docker
@@ -199,7 +212,13 @@ requests==2.32.3
# sphinx
retrying==1.3.4
# via dicomweb-client
-s3transfer==0.10.1
+rq==1.16.2
+ # via
+ # -r requirements.in
+ # rq-scheduler
+rq-scheduler==0.13.1
+ # via -r requirements.in
+s3transfer==0.10.0
# via boto3
six==1.16.0
# via
@@ -237,7 +256,7 @@ sqlalchemy==1.4.52
# -r requirements.in
# alembic
# databases
-starlette==0.37.2
+starlette==0.38.5
# via
# -r requirements.in
# starlette-auth-toolkit
@@ -262,10 +281,10 @@ typing-extensions==4.12.2
# aiosqlite
# alembic
# anyio
- # asgiref
# mypy
# pydantic
# reactivex
+ # uvicorn
urllib3==2.2.2
# via
# -r requirements.in
@@ -274,8 +293,8 @@ urllib3==2.2.2
# influxdb-client
# requests
# types-requests
-uvicorn==0.16.0
# via -r requirements.in
+uvicorn==0.30.6
watchdog==4.0.1
# via -r requirements.in
wheel==0.43.0
diff --git a/test.py b/test.py
index b91ff002..85d513ad 100755
--- a/test.py
+++ b/test.py
@@ -16,7 +16,7 @@ def run_test() -> None:
config.save_config()
client = TestClient(app)
- startup()
+ startup(app)
form_data = {
"username": "admin",
"password": "router"
diff --git a/tests/data/test_config.json b/tests/data/test_config.json
index d5a2dc5f..cd23cbc3 100755
--- a/tests/data/test_config.json
+++ b/tests/data/test_config.json
@@ -8,6 +8,7 @@
"error_folder": "/var/error",
"discard_folder": "/var/discard",
"processing_folder": "/var/processing",
+ "jobs_folder": "/var/jobs",
"bookkeeper": "0.0.0.0:8080",
"graphite_ip": "",
"graphite_port": 2003,
diff --git a/tests/generate_dicoms.py b/tests/generate_dicoms.py
new file mode 100644
index 00000000..70e074c7
--- /dev/null
+++ b/tests/generate_dicoms.py
@@ -0,0 +1,79 @@
+import itertools
+import os
+from pathlib import Path
+
+from pydicom.dataset import FileMetaDataset, Dataset
+from pydicom.uid import generate_uid, ExplicitVRLittleEndian, CTImageStorage
+
+def generate_dicom_files(accession_number, destination_folder:Path, num_files=10, num_studies=2, num_series=2):
+ """
+ Generate a folder of DICOM files with the given accession number.
+
+ :param accession_number: The accession number to use for the DICOM files
+ :param destination_folder: The parent folder where the accession subfolder will be created
+ :param num_files: The number of DICOM files to generate (default: 10)
+ """
+ # Create the accession subfolder
+
+ for (study_n,series_n,file_n) in itertools.product(range(num_studies), range(num_series), range(num_files)):
+ if file_n == 0:
+ series_uid = generate_uid()
+ if series_n == 0:
+ study_uid = generate_uid()
+
+ ds = Dataset()
+ ds.PatientName = "Test^Patient"
+ ds.PatientID = "12345"
+ ds.StudyDate = "20210101"
+ ds.AccessionNumber = accession_number
+ ds.is_little_endian = True
+ ds.is_implicit_VR = False
+
+ ds.SOPClassUID = CTImageStorage # CT Image Storage
+ ds.SOPInstanceUID = generate_uid()
+ ds.InstanceNumber = file_n + 1
+
+ ds.StudyInstanceUID = study_uid
+ ds.StudyDescription = f"study_{study_n + 1}"
+ ds.SeriesInstanceUID = series_uid
+ ds.SeriesNumber = series_n + 1
+ ds.SeriesDescription = f"series_{series_n + 1}"
+
+ ds.PixelData = b"\x00" * (100 * 100 * 2)
+ ds.NumberOfFrames = "1"
+ ds.Rows = 100
+ ds.Columns = 100
+ ds.PixelSpacing = [2, 2]
+ ds.BitsAllocated = 16
+ ds.BitsStored = 16
+ ds.HighBit = 15
+ ds.PixelRepresentation = 0
+ ds.SamplesPerPixel = 1
+ ds.file_meta = FileMetaDataset()
+ ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
+
+ # Save the DICOM file in the accession subfolder
+ dir = destination_folder / str(accession_number) / ds.StudyDescription / ds.SeriesDescription
+ dir.mkdir(parents=True, exist_ok=True)
+ filename = f"dicom_file_{file_n+1}.dcm"
+
+ ds.save_as(dir / filename, write_like_original=False)
+
+ print(f"Generated {num_files * num_series * num_studies} DICOM files with accession number {accession_number} in { destination_folder / accession_number}")
+
+if __name__ == "__main__":
+ import argparse
+
+ def dir_path(string) -> Path:
+ if (p:=Path(string)).is_dir():
+ return p
+ raise NotADirectoryError(string)
+
+ parser = argparse.ArgumentParser(description="Generate DICOM files with a specific accession number")
+ parser.add_argument("accession_number", help="Accession number for the DICOM files")
+ parser.add_argument("destination_folder", type=dir_path, help="Parent folder where the accession subfolder will be created")
+ parser.add_argument("--num_files", type=int, default=10, help="Number of DICOM files to generate (default: 10)")
+
+ args = parser.parse_args()
+
+ generate_dicom_files(args.accession_number, args.destination_folder, args.num_files)
diff --git a/tests/test_integration.py b/tests/test_integration.py
index 97cab986..874eb2c6 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -1,329 +1,12 @@
from dataclasses import dataclass
-import functools
-import json
-import multiprocessing
import os
from pathlib import Path
-import subprocess
-import sys
-import threading
import time
-from typing import Any, Callable, Generator, Optional
import pytest
-import requests
-from supervisor.supervisord import Supervisor
-from supervisor.states import RUNNING_STATES
-from supervisor.options import ServerOptions
-from supervisor.xmlrpc import SupervisorTransport
-import xmlrpc.client
-import tempfile
-from common.config import mercure_defaults
from common.types import FolderTarget, Module, Rule, Target
from tests.testing_common import create_minimal_dicom
-import pydicom
-import logging
-import socket
-import tempfile
-
-# current workding directory
-here = os.path.abspath(os.getcwd())
-
-def send_dicom(ds, dest_host, dest_port) -> None:
- with tempfile.NamedTemporaryFile('w') as ds_temp:
- ds.save_as(ds_temp.name)
- subprocess.run(["dcmsend", dest_host, str(dest_port), ds_temp.name],check=True)
-
-
-class SupervisorManager:
- process: Optional[multiprocessing.Process] = None
- config_path: Optional[Path] = None
- def __init__(self, mercure_base: Path) -> None:
- self.mercure_base = mercure_base
- self.socket = mercure_base / "supervisor.sock"
-
- def create_config(self, services) -> None:
- self.config_path = self.mercure_base / 'supervisord.conf'
- log_path = self.mercure_base / 'supervisord.logs'
- pidfile = self.mercure_base / 'supervisord.pid'
- self.config_path.touch()
-
- with self.config_path.open('w') as f:
- f.write(f"""
-[supervisord]
-nodaemon=true
-identifier=supervisor
-directory=/tmp
-loglevel=info
-pidfile={pidfile}
-sockfile={self.socket}
-logfile={log_path}
-[rpcinterface:supervisor]
-supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
-[unix_http_server]
-file={self.socket}
-[supervisorctl]
-serverurl=unix://{self.socket}
-""")
- for service in services:
- f.write(f"""
-[program:{service.name}]
-command={service.command}
-process_name=%(program_name)s{'_%(process_num)d' if service.numprocs>1 else ''}
-directory={os.getcwd()}
-autostart=false
-autorestart=false
-redirect_stderr=true
-startsecs={service.startsecs}
-stopasgroup={str(service.stopasgroup).lower()}
-numprocs={service.numprocs}
-environment=MERCURE_CONFIG_FOLDER="{self.mercure_base}/config"
-""")
-
- def run(self) -> None:
- args = ['-c', str(self.config_path)]
- options = ServerOptions()
- options.realize(args)
-
- s = Supervisor(options)
- options.first = True
- options.test = False
- try:
- s.main()
- except Exception as e:
- print(e)
-
- def start(self, services) -> None:
- self.create_config(services)
- self.process = multiprocessing.Process(target=self.run)
- self.process.start()
- self.wait_for_start()
- self.transport = SupervisorTransport(None, None, f'unix://{self.socket}')
- self.rpc = xmlrpc.client.ServerProxy('http://localhost', transport=self.transport)
-
- def start_service(self, name) -> None:
- self.rpc.supervisor.startProcess(name)
-
- def stop_service(self, name)-> None:
- self.rpc.supervisor.stopProcess(name)
-
- def all_services(self) -> Any:
- return self.rpc.supervisor.getAllProcessInfo() # type: ignore
-
- def get_service_log(self, name, offset=0, length=10000) -> Any:
- return self.rpc.supervisor.readProcessStdoutLog(name, offset, length) # type: ignore
-
- def stream_service_logs(self, name, timeout=1) -> None:
- offset = 0
- while True:
- log_data, offset, overflow = self.rpc.supervisor.tailProcessStdoutLog(name, offset, 1024) # type: ignore
- if log_data:
- print(log_data, end='', flush=True)
- if overflow:
- print(f"Warning: Log overflow detected for {name}. Some log entries may have been missed.")
- time.sleep(timeout)
-
- def stream_service_logs_threaded(self, name, timeout=1) -> threading.Thread:
- thread = threading.Thread(target=self.stream_service_logs, args=(name, timeout))
- thread.start()
- return thread
- def wait_for_start(self) -> None:
- while True:
- if Path(self.socket).exists():
- break
- else:
- time.sleep(0.1)
- def stop(self) -> None:
- if not self.process:
- return
- try:
- self.transport.close()
- self.process.terminate()
- self.process.join()
- except Exception as e:
- print(e)
- pass
-
-@dataclass
-class MercureService:
- name: str
- command: str
- numprocs: int = 1
- stopasgroup: bool = False
- startsecs: int = 0
-
-def is_dicoms_received(mercure_base, dicoms) -> None:
- dicoms_recieved = set()
- for series_folder in (mercure_base / 'data' / 'incoming').glob('*/'):
- for dicom in series_folder.glob('*.dcm'):
- ds_ = pydicom.dcmread(dicom)
- assert ds_.SeriesInstanceUID == series_folder.name
- assert ds_.SOPInstanceUID not in dicoms_recieved
- dicoms_recieved.add(ds_.SOPInstanceUID)
-
- assert dicoms_recieved == set(ds.SOPInstanceUID for ds in dicoms)
- print(f"Received {len(dicoms)} dicoms as expected")
-
-def is_dicoms_in_folder(folder, dicoms) -> None:
- uids_found = set()
- print(f"Looking for dicoms in {folder}")
- dicoms_found = []
- for f in folder.rglob('*'):
- if not f.is_file():
- continue
- if f.suffix == '.dcm':
- dicoms_found.append(f)
- if f.suffix not in ('.error','.tags'):
- dicoms_found.append(f)
- print("Dicoms", dicoms_found)
- for dicom in dicoms_found:
-
- try:
- uid = pydicom.dcmread(dicom).SOPInstanceUID
- uids_found.add(uid)
- except Exception as e:
- pass
- try:
- assert uids_found == set(ds.SOPInstanceUID for ds in dicoms), f"Dicoms missing from {folder}"
- except:
- print("Expected dicoms not found")
- for dicom in folder.glob('**/*.dcm'):
- print(dicom)
- raise
- print(f"Found {len(dicoms)} dicoms in {folder.name} as expected")
-
-def is_series_registered(bookkeeper_port, dicoms) -> None:
- result = requests.get(f"http://localhost:{bookkeeper_port}/query/series",
- headers={"Authorization": f"Token test"})
- assert result.status_code == 200
- result_json = result.json()
- assert set([r['series_uid'] for r in result_json]) == set([d.SeriesInstanceUID for d in dicoms])
-
-@pytest.fixture(scope="function")
-def supervisord(mercure_base):
- supervisor: Optional[SupervisorManager] = None
- def starter(services=[]):
- nonlocal supervisor
- if not supervisor:
- supervisor = SupervisorManager(mercure_base)
- supervisor.start(services)
- return supervisor
- return supervisor
- yield starter
- if supervisor is not None:
- supervisor.stop()
-
-
-def stop_mercure(supervisor: SupervisorManager):
- logs = {}
- for service in supervisor.all_services():
- if service['state'] in RUNNING_STATES:
- try:
- supervisor.stop_service(service['name'])
- except xmlrpc.client.Fault as e:
- if e.faultCode == 10:
- supervisor.stop_service(service['group']+":*")
- # log = get_service_log(service['name'])
- # if log:
- log = Path(service['stdout_logfile']).read_text()
- if log:
- logs[service['name']] = log
- return logs
-
-@pytest.fixture(scope="session")
-def python_bin():
- if os.environ.get("CLEAN_VENV"):
- with tempfile.TemporaryDirectory(prefix="mercure_venv") as venvdir:
- subprocess.run([sys.executable, "-m", "venv", venvdir], check=True)
- subprocess.run([os.path.join(venvdir, "bin", "pip"), "install", "-r", f"{here}/requirements.txt"], check=True)
- yield venvdir+"/bin/python"
- else:
- yield sys.executable
-
-@pytest.fixture(scope="function")
-def mercure(mercure_base, supervisord: Callable[[Any], SupervisorManager], python_bin) -> Generator[Callable[[Any],SupervisorManager], None, None]:
- def py_service(service, **kwargs) -> MercureService:
- return MercureService(service,f"{python_bin} {here}/{service}.py", **kwargs)
- services = [
- py_service("bookkeeper",startsecs=6),
- py_service("router", numprocs=5),
- py_service("processor", numprocs=2),
- py_service("dispatcher", numprocs=5),
- ]
- services += [MercureService(f"receiver", f"{here}/receiver.sh --inject-errors", stopasgroup=True)]
- supervisor = supervisord(services)
- def do_start(services_to_start=["bookkeeper", "reciever", "router", "processor", "dispatcher"]) -> SupervisorManager:
- for service in services_to_start:
- supervisor.start_service(service)
- return supervisor
- yield do_start
- logs = stop_mercure(supervisor)
- for l in logs:
- print(f"====== {l} ======")
- print(logs[l])
- print("=============")
-
-@pytest.fixture(scope="function")
-def mercure_base() -> Generator[Path, None, None]:
- with tempfile.TemporaryDirectory(prefix='mercure_') as temp_dir:
- temp_path = Path(temp_dir)
- for d in ['config','data']:
- (temp_path / d).mkdir()
- for k in ["incoming", "studies", "outgoing", "success", "error", "discard", "processing"]:
- (temp_path / 'data' / k).mkdir()
- yield temp_path
-
-def random_port() -> int:
- """
- Generate a free port number to use as an ephemeral endpoint.
- """
- s = socket.socket()
- s.bind(('',0)) # bind to any available port
- port = s.getsockname()[1] # get the port number
- s.close()
- return int(port)
-
-
-@pytest.fixture(scope="module")
-def receiver_port():
- return random_port()
-
-@pytest.fixture(scope="module")
-def bookkeeper_port():
- return random_port()
-
-
-@pytest.fixture(scope="function")
-def mercure_config(mercure_base, receiver_port, bookkeeper_port):
- mercure_config = { k: v for k, v in mercure_defaults.items()}
- for folder in (mercure_base / 'data').iterdir():
- mercure_config[f"{folder.name}_folder"] = str(folder)
-
- mercure_config["series_complete_trigger"] = 1
- mercure_config["study_complete_trigger"] = 2
- mercure_config["bookkeeper_api_key"] = "test"
- mercure_config["port"] = receiver_port
- mercure_config["bookkeeper"] = f"localhost:{bookkeeper_port}"
- with (mercure_base / 'config' / 'mercure.json').open('w') as fp:
- json.dump(mercure_config, fp)
-
- bookkeeper_config = f"""
-PORT={bookkeeper_port}
-HOST=0.0.0.0
-DATABASE_URL=sqlite:///{mercure_base}/data/bookkeeper.sqlite3
-DEBUG=True
-"""
- with (mercure_base / 'config' / 'bookkeeper.env').open('w') as fp:
- fp.write(bookkeeper_config)
-
- def update_config(config):
- with (mercure_base / 'config' / 'mercure.json').open('r+') as fp:
- data = json.load(fp)
- data.update(config)
- fp.seek(0)
- json.dump(data, fp)
- fp.truncate()
- return update_config
+from testing_integration_common import *
@pytest.mark.parametrize("n_series",(2,))
@pytest.mark.skipif("os.getenv('TEST_FAST',False)")
diff --git a/tests/test_notifications.py b/tests/test_notifications.py
index 89a62011..231ad2c4 100644
--- a/tests/test_notifications.py
+++ b/tests/test_notifications.py
@@ -18,6 +18,7 @@
import unittest.mock
import itertools
from typing import Iterator, Callable
+import pytest
logger = config.get_logger()
diff --git a/tests/test_processor.py b/tests/test_processor.py
index ce50d1d6..a93e22cd 100755
--- a/tests/test_processor.py
+++ b/tests/test_processor.py
@@ -12,21 +12,19 @@
import common
from common.monitor import task_event
-import process.process_series
import router
-import daiquiri
import processor
from itertools import permutations
from common.constants import mercure_version, mercure_names
import json
-from pprint import pprint
from common.types import *
import routing
import routing.generate_taskfile
from pathlib import Path
from testing_common import *
+from testing_common import mock_task_ids
from docker.models.containers import ContainerCollection
from docker.models.images import ImageCollection
@@ -36,6 +34,7 @@
import socket
from typing import Callable
+import pytest
logger = config.get_logger()
diff --git a/tests/test_query.py b/tests/test_query.py
new file mode 100644
index 00000000..3bd41fb7
--- /dev/null
+++ b/tests/test_query.py
@@ -0,0 +1,275 @@
+import os
+from pathlib import Path
+import tempfile
+from typing import Dict, Optional, Tuple
+import pydicom
+import pytest
+from pynetdicom import AE, evt, StoragePresentationContexts, build_role
+from pynetdicom.sop_class import Verification, StudyRootQueryRetrieveInformationModelFind, StudyRootQueryRetrieveInformationModelGet,PatientRootQueryRetrieveInformationModelGet, CTImageStorage # type: ignore
+from pynetdicom.status import Status
+from pydicom.uid import generate_uid
+from pydicom.dataset import Dataset, FileMetaDataset
+from rq import Worker
+from webinterface.dashboards.query.jobs import GetAccessionTask, QueryPipeline
+from webinterface.dicom_client import SimpleDicomClient
+
+from common.types import DicomTarget, DicomWebTarget
+from webinterface.common import redis
+from pydicom.uid import ExplicitVRLittleEndian, ImplicitVRLittleEndian
+from testing_common import receiver_port, mercure_config
+from logging import getLogger
+from rq import SimpleWorker, Queue, Connection
+from fakeredis import FakeStrictRedis
+
+getLogger('pynetdicom').setLevel('WARNING')
+# Mock data for testing
+MOCK_ACCESSIONS = ["1","2","3"]
+
+
+@pytest.fixture(scope="module", autouse=True)
+def rq_connection():
+ with Connection(redis):
+ yield redis
+
+@pytest.fixture(scope="module")
+def mock_node(receiver_port):
+ return DicomTarget(ip="127.0.0.1", port=str(receiver_port), aet_target="TEST")
+
+class DummyDICOMServer:
+ remaining_allowed_accessions: Optional[int] = None
+ """A simple DICOM server for testing purposes."""
+ def __init__(self, port:int, datasets: Dict[str,Dataset]):
+ assert isinstance(port, int), "Port must be an integer"
+ for ds in datasets.values():
+ assert isinstance(ds, Dataset), "Dataset must be a pydicom Dataset"
+ self.ae = AE()
+ # Add support for DICOM verification
+ self.ae.add_supported_context(Verification)
+ self.datasets = datasets
+ # Define handler for C-FIND requests
+ def handle_find(event):
+ ds = event.identifier
+
+ # Create a dummy response
+ # Check if the request matches our dummy data
+ if 'AccessionNumber' in ds and ds.AccessionNumber in MOCK_ACCESSIONS:
+ yield (0xFF00, self.datasets[ds.AccessionNumber])
+ else:
+ yield (0x0000, None) # Status 'Success', but no match
+
+ # Define handler for C-GET requests
+ def handle_get(event):
+ ds = event.identifier
+ # yield 1
+ # Check if the request matches our dummy data
+ if 'AccessionNumber' in ds and ds.AccessionNumber in MOCK_ACCESSIONS \
+ and ( self.remaining_allowed_accessions is None or self.remaining_allowed_accessions > 0 ):
+ # Create a dummy DICOM dataset
+ yield 1
+
+ dummy_ds = self.datasets[ds.AccessionNumber].copy()
+ dummy_ds.SOPClassUID = CTImageStorage # CT Image Storage
+ dummy_ds.SOPInstanceUID = generate_uid()
+ dummy_ds.file_meta = FileMetaDataset()
+ dummy_ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
+
+ # Yield the dataset
+ if self.remaining_allowed_accessions:
+ self.remaining_allowed_accessions = self.remaining_allowed_accessions - 1
+ yield (0xFF00, dummy_ds)
+ else:
+ yield 0
+ yield (0x0000, None) # Status 'Success', but no match
+ # Bind the C-FIND handler
+
+
+ # Add the supported presentation contexts (Storage SCU)
+ self.ae.supported_contexts = StoragePresentationContexts
+
+ for cx in self.ae.supported_contexts:
+ cx.scp_role = True
+ cx.scu_role = False
+
+ # Add a supported presentation context (QR Get SCP)
+ self.ae.add_supported_context(PatientRootQueryRetrieveInformationModelGet)
+ self.ae.add_supported_context(StudyRootQueryRetrieveInformationModelGet)
+ self.ae.add_supported_context(StudyRootQueryRetrieveInformationModelFind)
+
+ self.ae.start_server(("127.0.0.1", port), block=False, evt_handlers=[(evt.EVT_C_FIND, handle_find), (evt.EVT_C_GET, handle_get)])
+
+ def stop(self)->None:
+ """Stop the DICOM server."""
+ self.ae.shutdown()
+
+@pytest.fixture(scope="function")
+def dummy_datasets():
+ dss = {}
+ for acc in MOCK_ACCESSIONS:
+ ds = Dataset()
+ ds.PatientName = "Test^Patient"
+ ds.PatientID = "12345"
+ ds.StudyDescription = "Test Study"
+ ds.StudyDate = "20210101"
+ ds.StudyInstanceUID = generate_uid()
+ ds.SeriesInstanceUID = generate_uid()
+ ds.AccessionNumber = acc
+ ds.is_little_endian = True
+ ds.is_implicit_VR = False
+ dss[acc] = ds
+ return dss
+
+@pytest.fixture(scope="function")
+def dicom_server(mock_node, dummy_datasets):
+ """
+ Pytest fixture to start a DICOM server before tests and stop it after.
+ This fixture has module scope, so the server will be started once for all tests in the module.
+ """
+ server = DummyDICOMServer(int(mock_node.port), dummy_datasets)
+ yield mock_node
+ server.stop()
+
+@pytest.fixture(scope="function")
+def dicom_server_2(mock_node, dummy_datasets):
+ """
+ Pytest fixture to start a DICOM server before tests and stop it after.
+ This fixture has module scope, so the server will be started once for all tests in the module.
+ """
+ server = DummyDICOMServer(int(mock_node.port), dummy_datasets)
+ yield mock_node, server
+ server.stop()
+
+
+@pytest.fixture(scope="function")
+def dicomweb_server(dummy_datasets, tempdir):
+ (tempdir / "dicomweb").mkdir()
+
+ for dummy_dataset in dummy_datasets.values():
+ ds = dummy_dataset.copy()
+ ds.SOPClassUID = CTImageStorage # CT Image Storage
+ ds.SOPInstanceUID = generate_uid()
+ ds.StudyInstanceUID = generate_uid()
+ ds.file_meta = FileMetaDataset()
+ ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
+ ds.save_as(tempdir / "dicomweb" / ds.SOPInstanceUID, write_like_original=False)
+
+ yield DicomWebTarget(url=f"file://{tempdir}/dicomweb")
+
+def test_simple_dicom_client(dicom_server):
+ """Test the SimpleDicomClient can connect to and query the DICOM server."""
+ client = SimpleDicomClient(dicom_server.ip, dicom_server.port, dicom_server.aet_target, None)
+
+ result = client.findscu(MOCK_ACCESSIONS[0])
+ assert result is not None # We expect some result, even if it's an empty dataset
+ assert result[0].AccessionNumber == MOCK_ACCESSIONS[0] # Check if the accession number matches
+
+@pytest.fixture(scope="function")
+def tempdir():
+ with tempfile.TemporaryDirectory(prefix="mercure_temp") as d:
+ yield Path(d)
+
+def test_get_accession_job(dicom_server, dicomweb_server, mercure_config):
+ """Test the get_accession_job function."""
+ config = mercure_config()
+ job_id = "test_job"
+ print(config.jobs_folder)
+ assert(Path(config.jobs_folder)).exists()
+ (Path(config.jobs_folder) / "foo/").touch()
+ for server in (dicom_server, dicomweb_server):
+
+ generator = GetAccessionTask.get_accession(job_id, MOCK_ACCESSIONS[0], server, search_filters={}, path=config.jobs_folder)
+ results = list(generator)
+ # Check that we got some results
+ assert len(results) > 0
+ assert results[0].remaining == 0
+ assert pydicom.dcmread(next(k for k in Path(config.jobs_folder).rglob("*.dcm"))).AccessionNumber == MOCK_ACCESSIONS[0]
+
+def test_query_job(dicom_server, tempdir, rq_connection):
+ """
+ Test the create_job function.
+ We use mocker to mock the queue and avoid actually creating jobs.
+ """
+ job = QueryPipeline.create([MOCK_ACCESSIONS[0]], {}, dicom_server, str(tempdir))
+ w = SimpleWorker(["mercure_fast", "mercure_slow"], connection=rq_connection)
+ w.work(burst=True)
+ # assert len(list(Path(config.mercure.jobs_folder).iterdir())) == 1
+ print([k for k in Path(tempdir).rglob('*')])
+ assert pydicom.dcmread(next(k for k in Path(tempdir).rglob("*.dcm"))).AccessionNumber == MOCK_ACCESSIONS[0]
+
+def tree(path, prefix='', level=0) -> None:
+ if level==0:
+ print(path)
+ entries = list(os.listdir(path))
+ entries = sorted(entries, key=lambda e: (e.is_file(), e.name))
+ if not entries and level==0:
+ print(prefix + "[[ empty ]]")
+ for i, entry in enumerate(entries):
+ conn = '└── ' if i == len(entries) - 1 else '├── '
+ print(f'{prefix}{conn}{entry.name}')
+ if entry.is_dir():
+ tree(entry.path, prefix + (' ' if i == len(entries) - 1 else '│ '), level+1)
+
+def test_query_dicomweb(dicomweb_server, tempdir, dummy_datasets, fs):
+ (tempdir / "outdir").mkdir()
+ ds = list(dummy_datasets.values())[0]
+ task = QueryPipeline.create([ds.AccessionNumber], {}, dicomweb_server, (tempdir / "outdir"))
+ assert task
+ w = SimpleWorker(["mercure_fast", "mercure_slow"], connection=redis)
+ w.work(burst=True)
+ # tree(tempdir / "outdir")
+ outfile = (tempdir / "outdir" / task.id / ds.AccessionNumber / f"{ds.SOPInstanceUID}.dcm")
+ assert outfile.exists(), f"Expected output file {outfile} does not exist."
+ task.get_meta()
+ assert task.meta['completed'] == 1
+ assert task.meta['total'] == 1
+
+def test_query_operations(dicomweb_server, tempdir, dummy_datasets, fs, rq_connection):
+ (tempdir / "outdir").mkdir()
+ task = QueryPipeline.create([ds.AccessionNumber for ds in dummy_datasets.values()], {}, dicomweb_server, (tempdir / "outdir"))
+ assert task
+ assert task.meta['total'] == len(dummy_datasets)
+ assert task.meta['completed'] == 0
+ task.pause()
+ for job in (jobs:=task.get_subjobs()):
+ assert job.meta.get("paused")
+ assert job.get_status() == "canceled"
+ assert jobs
+
+ w = SimpleWorker(["mercure_fast", "mercure_slow"], connection=redis)
+ w.work(burst=True)
+ outfile = (tempdir / "outdir" / task.id)
+ task.get_meta()
+ assert task.meta['completed'] == 0
+ assert not outfile.exists()
+ task.resume()
+
+ for job in task.get_subjobs():
+ assert not job.meta.get("paused")
+ assert job.get_status() == "queued"
+
+ w.work(burst=True)
+ for ds in dummy_datasets.values():
+ outfile = (tempdir / "outdir" / task.id / ds.AccessionNumber / f"{ds.SOPInstanceUID}.dcm")
+ assert outfile.exists(), f"Expected output file {outfile} does not exist."
+ task.get_meta()
+ assert task.meta['completed'] == len(dummy_datasets)
+ assert task.meta['total'] == len(dummy_datasets)
+
+def test_query_retry(dicom_server_2: Tuple[DicomTarget,DummyDICOMServer], tempdir, dummy_datasets, fs, rq_connection):
+ (tempdir / "outdir").mkdir()
+ target, server = dicom_server_2
+ task = QueryPipeline.create([ds.AccessionNumber for ds in dummy_datasets.values()], {}, target, (tempdir / "outdir"))
+
+ server.remaining_allowed_accessions = 1 # Only one accession is allowed to be retrieved
+ w = SimpleWorker(["mercure_fast", "mercure_slow"], connection=redis)
+ w.work(burst=True)
+ task.get_meta()
+ assert task.meta['completed'] == 1
+ assert task.meta['total'] == len(dummy_datasets)
+ assert "Failure during retrieval" in task.meta['failed_reason']
+ # Retry the query
+ server.remaining_allowed_accessions = None
+ task.retry()
+ w.work(burst=True)
+ task.get_meta()
+ assert task.meta['completed'] == len(dummy_datasets)
+ assert task.meta['failed_reason'] is None
\ No newline at end of file
diff --git a/tests/test_router.py b/tests/test_router.py
index fea89276..6a766bdd 100755
--- a/tests/test_router.py
+++ b/tests/test_router.py
@@ -26,6 +26,8 @@
from testing_common import *
+from testing_common import mock_task_ids
+
# import common.config as config
rules = {
diff --git a/tests/testing_common.py b/tests/testing_common.py
index 9293eaa3..09f247a7 100755
--- a/tests/testing_common.py
+++ b/tests/testing_common.py
@@ -6,6 +6,7 @@
import os
from pathlib import Path
import shutil
+import socket
from typing import Callable, Dict, Any, Iterator, Optional, Tuple
import uuid
@@ -101,7 +102,7 @@ def mercure_config(fs) -> Callable[[Dict], Config]:
config_path = os.path.realpath(os.path.dirname(os.path.realpath(__file__)) + "/data/test_config.json")
fs.add_real_file(config_path, target_path=config.configuration_filename, read_only=False)
- for k in ["incoming", "studies", "outgoing", "success", "error", "discard", "processing"]:
+ for k in ["incoming", "studies", "outgoing", "success", "error", "discard", "processing", "jobs"]:
fs.create_dir(f"/var/{k}")
def set_config(extra: Dict[Any, Any] = {}) -> Config:
@@ -235,4 +236,23 @@ def mock_incoming_uid(config, fs, series_uid, tags={}, name="bar", force_tags_ou
# ( incoming / "receiver_info").mkdir(exist_ok=True)
# ( incoming / "receiver_info" / (series_uid+".received")).touch()
- return str(dcm_file), tags_f
\ No newline at end of file
+ return str(dcm_file), tags_f
+
+def random_port() -> int:
+ """
+ Generate a free port number to use as an ephemeral endpoint.
+ """
+ s = socket.socket()
+ s.bind(('',0)) # bind to any available port
+ port = s.getsockname()[1] # get the port number
+ s.close()
+ return int(port)
+
+
+@pytest.fixture(scope="module")
+def receiver_port():
+ return random_port()
+
+@pytest.fixture(scope="module")
+def bookkeeper_port():
+ return random_port()
diff --git a/tests/testing_integration_common.py b/tests/testing_integration_common.py
new file mode 100644
index 00000000..2c2e94c3
--- /dev/null
+++ b/tests/testing_integration_common.py
@@ -0,0 +1,334 @@
+from dataclasses import dataclass
+import json
+import multiprocessing
+import os
+from pathlib import Path
+import subprocess
+import sys
+import threading
+import time
+from typing import Any, Callable, Dict, Generator, Optional
+import pytest
+import requests
+from supervisor.supervisord import Supervisor
+from supervisor.states import RUNNING_STATES
+from supervisor.options import ServerOptions
+from supervisor.xmlrpc import SupervisorTransport
+import xmlrpc.client
+import tempfile
+from common.config import mercure_defaults
+import pydicom
+import socket
+import tempfile
+
+
+# current workding directory
+here = os.path.abspath(os.getcwd())
+
+def send_dicom(ds, dest_host, dest_port) -> None:
+ with tempfile.NamedTemporaryFile('w') as ds_temp:
+ ds.save_as(ds_temp.name)
+ subprocess.run(["dcmsend", dest_host, str(dest_port), ds_temp.name],check=True)
+
+
+class SupervisorManager:
+ process: Optional[multiprocessing.Process] = None
+ config_path: Optional[Path] = None
+ def __init__(self, mercure_base: Path) -> None:
+ self.mercure_base = mercure_base
+ self.socket = mercure_base / "supervisor.sock"
+
+ def create_config(self, services) -> None:
+ self.config_path = self.mercure_base / 'supervisord.conf'
+ log_path = self.mercure_base / 'supervisord.logs'
+ pidfile = self.mercure_base / 'supervisord.pid'
+ self.config_path.touch()
+
+ with self.config_path.open('w') as f:
+ f.write(f"""
+[supervisord]
+nodaemon=true
+identifier=supervisor
+directory=/tmp
+loglevel=info
+pidfile={pidfile}
+sockfile={self.socket}
+logfile={log_path}
+[rpcinterface:supervisor]
+supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
+[unix_http_server]
+file={self.socket}
+[supervisorctl]
+serverurl=unix://{self.socket}
+""")
+ for service in services:
+ # environment=""
+ # if service.environment:
+ # environment = ","
+ # for key, value in service.environment.items():
+ # environment += f"{key}=\"{value}\","
+ # if environment[-1] == ',':
+ # environment = environment[:-1]
+ f.write(f"""
+[program:{service.name}]
+command={service.command}
+process_name=%(program_name)s{'_%(process_num)d' if service.numprocs>1 else ''}
+directory={os.getcwd()}
+autostart=false
+autorestart=false
+redirect_stderr=true
+startsecs={service.startsecs}
+stopasgroup={str(service.stopasgroup).lower()}
+numprocs={service.numprocs}
+environment=MERCURE_CONFIG_FOLDER="{self.mercure_base}/config"
+""")
+
+ def run(self) -> None:
+ args = ['-c', str(self.config_path)]
+ options = ServerOptions()
+ options.realize(args)
+
+ s = Supervisor(options)
+ options.first = True
+ options.test = False
+ try:
+ s.main()
+ except Exception as e:
+ print(e)
+
+ def start(self, services) -> None:
+ self.create_config(services)
+ self.process = multiprocessing.Process(target=self.run)
+ self.process.start()
+ self.wait_for_start()
+ self.transport = SupervisorTransport(None, None, f'unix://{self.socket}')
+ self.rpc = xmlrpc.client.ServerProxy('http://localhost', transport=self.transport)
+
+ def start_service(self, name) -> None:
+ self.rpc.supervisor.startProcess(name)
+
+ def stop_service(self, name)-> None:
+ self.rpc.supervisor.stopProcess(name)
+
+ def all_services(self) -> Any:
+ return self.rpc.supervisor.getAllProcessInfo() # type: ignore
+
+ def get_service_log(self, name, offset=0, length=10000) -> Any:
+ return self.rpc.supervisor.readProcessStdoutLog(name, offset, length) # type: ignore
+
+ def stream_service_logs(self, name, timeout=1) -> None:
+ offset = 0
+ while True:
+ log_data, offset, overflow = self.rpc.supervisor.tailProcessStdoutLog(name, offset, 1024) # type: ignore
+ if log_data:
+ print(log_data, end='', flush=True)
+ if overflow:
+ print(f"Warning: Log overflow detected for {name}. Some log entries may have been missed.")
+ time.sleep(timeout)
+
+ def stream_service_logs_threaded(self, name, timeout=1) -> threading.Thread:
+ thread = threading.Thread(target=self.stream_service_logs, args=(name, timeout))
+ thread.start()
+ return thread
+ def wait_for_start(self) -> None:
+ while True:
+ if Path(self.socket).exists():
+ break
+ else:
+ time.sleep(0.1)
+ def stop(self) -> None:
+ if not self.process:
+ return
+ try:
+ self.transport.close()
+ self.process.terminate()
+ self.process.join()
+ except Exception as e:
+ print(e)
+ pass
+
+@dataclass
+class MercureService:
+ name: str
+ command: str
+ numprocs: int = 1
+ stopasgroup: bool = False
+ startsecs: int = 0
+
+def is_dicoms_received(mercure_base, dicoms) -> None:
+ dicoms_recieved = set()
+ for series_folder in (mercure_base / 'data' / 'incoming').glob('*/'):
+ for dicom in series_folder.glob('*.dcm'):
+ ds_ = pydicom.dcmread(dicom)
+ assert ds_.SeriesInstanceUID == series_folder.name
+ assert ds_.SOPInstanceUID not in dicoms_recieved
+ dicoms_recieved.add(ds_.SOPInstanceUID)
+
+ assert dicoms_recieved == set(ds.SOPInstanceUID for ds in dicoms)
+ print(f"Received {len(dicoms)} dicoms as expected")
+
+def is_dicoms_in_folder(folder, dicoms) -> None:
+ uids_found = set()
+ print(f"Looking for dicoms in {folder}")
+ dicoms_found = []
+ for f in folder.rglob('*'):
+ if not f.is_file():
+ continue
+ if f.suffix == '.dcm':
+ dicoms_found.append(f)
+ if f.suffix not in ('.error','.tags'):
+ dicoms_found.append(f)
+ print("Dicoms", dicoms_found)
+ for dicom in dicoms_found:
+
+ try:
+ uid = pydicom.dcmread(dicom).SOPInstanceUID
+ uids_found.add(uid)
+ except Exception as e:
+ pass
+ try:
+ assert uids_found == set(ds.SOPInstanceUID for ds in dicoms), f"Dicoms missing from {folder}"
+ except:
+ print("Expected dicoms not found")
+ for dicom in folder.glob('**/*.dcm'):
+ print(dicom)
+ raise
+ print(f"Found {len(dicoms)} dicoms in {folder.name} as expected")
+
+def is_series_registered(bookkeeper_port, dicoms) -> None:
+ result = requests.get(f"http://localhost:{bookkeeper_port}/query/series",
+ headers={"Authorization": f"Token test"})
+ assert result.status_code == 200
+ result_json = result.json()
+ assert set([r['series_uid'] for r in result_json]) == set([d.SeriesInstanceUID for d in dicoms])
+
+@pytest.fixture(scope="function")
+def supervisord(mercure_base):
+ supervisor: Optional[SupervisorManager] = None
+ def starter(services=[]):
+ nonlocal supervisor
+ if not supervisor:
+ supervisor = SupervisorManager(mercure_base)
+ supervisor.start(services)
+ return supervisor
+ return supervisor
+ yield starter
+ if supervisor is not None:
+ supervisor.stop()
+
+
+def stop_mercure(supervisor: SupervisorManager):
+ logs = {}
+ for service in supervisor.all_services():
+ if service['state'] in RUNNING_STATES:
+ try:
+ supervisor.stop_service(service['name'])
+ except xmlrpc.client.Fault as e:
+ if e.faultCode == 10:
+ supervisor.stop_service(service['group']+":*")
+ # log = get_service_log(service['name'])
+ # if log:
+ log = Path(service['stdout_logfile']).read_text()
+ if log:
+ logs[service['name']] = log
+ return logs
+
+@pytest.fixture(scope="session")
+def python_bin():
+ if os.environ.get("CLEAN_VENV"):
+ with tempfile.TemporaryDirectory(prefix="mercure_venv") as venvdir:
+ subprocess.run([sys.executable, "-m", "venv", venvdir], check=True)
+ subprocess.run([os.path.join(venvdir, "bin", "pip"), "install", "-r", f"{here}/requirements.txt"], check=True)
+ yield venvdir+"/bin/python"
+ else:
+ yield sys.executable
+
+
+@pytest.fixture(scope="function")
+def mercure_base() -> Generator[Path, None, None]:
+ with tempfile.TemporaryDirectory(prefix='mercure_') as temp_dir:
+ temp_path = Path(temp_dir)
+ for d in ['config','data']:
+ (temp_path / d).mkdir()
+ for k in ["incoming", "studies", "outgoing", "success", "error", "discard", "processing", "jobs"]:
+ (temp_path / 'data' / k).mkdir()
+ yield temp_path
+
+@pytest.fixture(scope="function")
+def mercure(supervisord: Callable[[Any], SupervisorManager], python_bin) -> Generator[Callable[[Any],SupervisorManager], None, None]:
+ def py_service(service, **kwargs) -> MercureService:
+ if 'command' not in kwargs:
+ kwargs['command'] = f"{python_bin} {here}/{service}.py"
+ return MercureService(service,**kwargs)
+ services = [
+ py_service("bookkeeper",startsecs=6),
+ py_service("router", numprocs=5),
+ py_service("processor", numprocs=2),
+ py_service("dispatcher", numprocs=5),
+ py_service("worker_fast", command=f"{python_bin} -m rq.cli worker mercure_fast"),
+ py_service("worker_slow", command=f"{python_bin} -m rq.cli worker mercure_slow")
+ ]
+ services += [MercureService(f"receiver", f"{here}/receiver.sh --inject-errors", stopasgroup=True)]
+ supervisor = supervisord(services)
+ def do_start(services_to_start=["bookkeeper", "reciever", "router", "processor", "dispatcher"]) -> SupervisorManager:
+ for service in services_to_start:
+ supervisor.start_service(service)
+ return supervisor
+ yield do_start
+ logs = stop_mercure(supervisor)
+ for l in logs:
+ print(f"====== {l} ======")
+ print(logs[l])
+ print("=============")
+
+def random_port() -> int:
+ """
+ Generate a free port number to use as an ephemeral endpoint.
+ """
+ s = socket.socket()
+ s.bind(('',0)) # bind to any available port
+ port = s.getsockname()[1] # get the port number
+ s.close()
+ return int(port)
+
+
+@pytest.fixture(scope="module")
+def receiver_port():
+ return random_port()
+
+@pytest.fixture(scope="module")
+def bookkeeper_port():
+ return random_port()
+
+
+@pytest.fixture(scope="function")
+def mercure_config(mercure_base, receiver_port, bookkeeper_port):
+ mercure_config = { k: v for k, v in mercure_defaults.items()}
+ for folder in (mercure_base / 'data').iterdir():
+ mercure_config[f"{folder.name}_folder"] = str(folder)
+
+ mercure_config["series_complete_trigger"] = 1
+ mercure_config["study_complete_trigger"] = 2
+ mercure_config["bookkeeper_api_key"] = "test"
+ mercure_config["port"] = receiver_port
+ mercure_config["bookkeeper"] = f"localhost:{bookkeeper_port}"
+ with (mercure_base / 'config' / 'mercure.json').open('w') as fp:
+ json.dump(mercure_config, fp)
+
+ bookkeeper_config = f"""
+PORT={bookkeeper_port}
+HOST=0.0.0.0
+DATABASE_URL=sqlite:///{mercure_base}/data/bookkeeper.sqlite3
+DEBUG=True
+"""
+ with (mercure_base / 'config' / 'bookkeeper.env').open('w') as fp:
+ fp.write(bookkeeper_config)
+
+ def update_config(config):
+ with (mercure_base / 'config' / 'mercure.json').open('r+') as fp:
+ data = json.load(fp)
+ data.update(config)
+ fp.seek(0)
+ json.dump(data, fp)
+ fp.truncate()
+ return update_config
diff --git a/webgui.py b/webgui.py
index e6e0f43b..1613dd33 100755
--- a/webgui.py
+++ b/webgui.py
@@ -26,7 +26,7 @@
import daiquiri
import html
from pathlib import Path
-from typing import Any, Optional, Union
+from typing import Any, Dict, List, Optional, TypedDict, Union
import docker
import hupper
import nomad
@@ -67,7 +67,7 @@
import webinterface.api as api
import webinterface.dashboards as dashboards
from webinterface.common import *
-
+from webinterface.dashboards.query.jobs import QueryPipeline
from decoRouter import Router as decoRouter
router = decoRouter()
@@ -129,15 +129,39 @@ async def authenticate(self, request):
@contextlib.asynccontextmanager
async def lifespan(app):
- startup()
- yield
+ result = startup(app)
+ yield result
await shutdown()
-def startup() -> None:
+def startup(app: Starlette):
+ state = {"redis_connected": False}
+ try:
+ response = redis.ping()
+ if response:
+ logger.info("Redis connection validated.")
+ state["redis_connected"] = True
+ else:
+ raise Exception("Redis connection failed")
+ except:
+ logger.error("Could not connect to Redis", exc_info=True)
+
+ if state["redis_connected"]:
+ scheduled_jobs = rq_fast_scheduler.get_jobs()
+ for job in scheduled_jobs:
+ if job.meta.get("type") != "offpeak":
+ continue
+ rq_fast_scheduler.cancel(job)
+ rq_fast_scheduler.schedule(
+ scheduled_time=datetime.datetime.utcnow(),
+ func=QueryPipeline.update_all_offpeak,
+ interval=60,
+ meta={"type": "offpeak"},
+ repeat=None
+ )
monitor.configure("webgui", "main", config.mercure.bookkeeper)
monitor.send_event(monitor.m_events.BOOT, monitor.severity.INFO, f"PID = {os.getpid()}")
-
+ return state
async def shutdown() -> None:
monitor.send_event(monitor.m_events.SHUTDOWN, monitor.severity.INFO, "")
@@ -260,6 +284,7 @@ async def show_log(request) -> Response:
return_code = 0
except:
pass
+ sub_services = []
elif runtime == "systemd":
start_date_cmd = ""
end_date_cmd = ""
@@ -268,21 +293,38 @@ async def show_log(request) -> Response:
if end_timestamp:
end_date_cmd = f'--until "{end_timestamp}"'
+ service_name_or_list = services.services_list[requested_service]["systemd_service"]
+ if isinstance(service_name_or_list, list):
+ service_name = request.query_params.get("subservice", service_name_or_list[0])
+ sub_services = service_name_or_list
+ else:
+ service_name = service_name_or_list
+ sub_services = []
run_result = await async_run(
f"sudo journalctl -n 1000 -u "
- f'{services.services_list[requested_service]["systemd_service"]} '
+ f'{service_name} '
f"{start_date_cmd} {end_date_cmd}"
)
return_code = -1 if run_result[0] is None else run_result[0]
raw_logs = run_result[1]
+
elif runtime == "docker":
client = docker.from_env() # type: ignore
try:
- container = client.containers.get(services.services_list[requested_service]["docker_service"])
+ service_name_or_list = services.services_list[requested_service]["docker_service"]
+ if isinstance(service_name_or_list, list):
+ service_name = request.query_params.get("subservice", service_name_or_list[0])
+ sub_services = service_name_or_list
+ else:
+ service_name = service_name_or_list
+ sub_services = []
+
+ container = client.containers.get(service_name)
container.reload()
raw_logs = container.logs(since=start_obj, timestamps=True)
return_code = 0
- except (docker.errors.NotFound, docker.errors.APIError): # type: ignore
+ except (docker.errors.NotFound, docker.errors.APIError) as e: # type: ignore
+ logger.error(e)
return_code = 1
# return_code, raw_logs = (await async_run("/usr/bin/nomad alloc logs -job -stderr -f -tail mercure router"))[:2]
@@ -303,7 +345,7 @@ async def show_log(request) -> Response:
template = "logs.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
+
"page": "logs",
"service_logs": service_logs,
"log_id": requested_service,
@@ -314,8 +356,9 @@ async def show_log(request) -> Response:
"end_time": end_time,
"end_time_available": runtime == "systemd",
"start_time_available": runtime in ("docker", "systemd"),
+ "sub_services": sub_services,
+ "subservice": request.query_params.get("subservice", None)
}
- context.update(get_user_information(request))
return templates.TemplateResponse(template, context)
@@ -345,7 +388,6 @@ async def configuration(request) -> Response:
"config_edited": config_edited,
"runtime": runtime,
}
- context.update(get_user_information(request))
return templates.TemplateResponse(template, context)
@@ -375,7 +417,6 @@ async def configuration_edit(request) -> Response:
"page": "homepage",
"config_content": config_content,
}
- context.update(get_user_information(request))
return templates.TemplateResponse(template, context)
@@ -419,7 +460,7 @@ async def login(request) -> Response:
template = "login.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
+
"appliance_name": config.mercure.get("appliance_name", "master"),
}
return templates.TemplateResponse(template, context)
@@ -607,7 +648,7 @@ async def login_post(request) -> Response:
context = {
"request": request,
"invalid_password": 1,
- "mercure_version": mercure_defs.VERSION,
+
"appliance_name": config.mercure.get("appliance_name", "mercure Router"),
}
return templates.TemplateResponse(template, context)
@@ -635,14 +676,13 @@ async def settings_edit(request) -> Response:
template = "users_edit.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
+
"page": "settings",
"edituser": own_name,
"edituser_info": users.users_list[own_name],
"own_settings": "True",
"change_password": users.users_list[own_name].get("change_password", "False"),
}
- context.update(get_user_information(request))
return templates.TemplateResponse(template, context)
@@ -650,6 +690,68 @@ async def settings_edit(request) -> Response:
## Homepage endpoints
###################################################################################
+async def get_service_status(runtime) -> List[Dict[str, Any]]:
+ service_status = {service: {
+ "id": service,
+ "name": value["name"],
+ "running": None
+ } for service, value in services.services_list.items()}
+ logger.warning(service_status)
+ logger.warning(services.services_list)
+ try:
+ for service_id, service_info in services.services_list.items():
+ running_status: Optional[bool] = False
+
+ if runtime == "systemd":
+ systemd_services = service_info["systemd_service"]
+ if not isinstance(systemd_services, list):
+ systemd_services = [systemd_services]
+
+ for service_name in systemd_services:
+ exit_code, _, _ = await async_run(f"systemctl is-active {service_name}")
+ if exit_code == 0:
+ running_status = True
+ else:
+ running_status = False
+ break
+
+ elif runtime == "docker":
+ client = docker.from_env() # type: ignore
+ docker_services = service_info["docker_service"]
+ if not isinstance(docker_services, list):
+ docker_services = [docker_services]
+
+ try:
+ for docker_service in docker_services:
+ container = client.containers.get(docker_service)
+ container.reload()
+ status = container.status
+ """restarting, running, paused, exited"""
+ if status == "running":
+ running_status = True
+
+ except (docker.errors.NotFound, docker.errors.APIError): # type: ignore
+ running_status = False
+ elif runtime == "nomad":
+ if nomad_connection is None:
+ running_status = None
+ else:
+ allocations = nomad_connection.job.get_allocations("mercure")
+ running_alloc = [a for a in allocations if a["ClientStatus"] == "running"]
+ if not running_alloc:
+ running_status = False
+ else:
+ alloc = running_alloc[0]
+ if not alloc["TaskStates"].get(service_id):
+ running_status = False
+ else: # TODO: fix this for workers?
+ running_status = alloc["TaskStates"][service_id]["State"] == "running"
+
+ service_status[service_id]["running"] = running_status
+ except:
+ logger.exception("Failed to get service status.")
+ finally:
+ return list(service_status.values())
@router.get("/")
@requires("authenticated", redirect="login")
@@ -675,50 +777,16 @@ async def homepage(request) -> Response:
free_space = "N/A"
disk_total = "N/A"
- service_status = {}
- for service in services.services_list:
- running_status: Optional[bool] = False
-
- if runtime == "systemd":
- if (await async_run("systemctl is-active " + services.services_list[service]["systemd_service"]))[0] == 0:
- running_status = True
-
- elif runtime == "docker":
- client = docker.from_env() # type: ignore
- try:
- container = client.containers.get(services.services_list[service]["docker_service"])
- container.reload()
- status = container.status
- """restarting, running, paused, exited"""
- if status == "running":
- running_status = True
-
- except (docker.errors.NotFound, docker.errors.APIError): # type: ignore
- running_status = False
- elif runtime == "nomad":
- if nomad_connection is None:
- running_status = None
- else:
- allocations = nomad_connection.job.get_allocations("mercure")
- running_alloc = [a for a in allocations if a["ClientStatus"] == "running"]
- if not running_alloc:
- running_status = False
- else:
- alloc = running_alloc[0]
- if not alloc["TaskStates"].get(service):
- running_status = False
- else:
- running_status = alloc["TaskStates"][service]["State"] == "running"
- service_status[service] = {
- "id": service,
- "name": services.services_list[service]["name"],
- "running": running_status,
- }
+ try:
+ service_status = await get_service_status(runtime)
+ except Exception as e:
+ logger.error(f"Error getting service status: {e}")
+ service_status = [{}]
template = "index.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
+
"page": "homepage",
"used_space": used_space,
"free_space": free_space,
@@ -726,7 +794,6 @@ async def homepage(request) -> Response:
"service_status": service_status,
"runtime": runtime,
}
- context.update(get_user_information(request))
return templates.TemplateResponse(template, context)
@@ -754,27 +821,36 @@ async def control_services(request) -> Response:
continue
if runtime == "systemd":
- command = "sudo systemctl " + action + " " + services.services_list[service]["systemd_service"]
- logger.info(f"Executing: {command}")
- await async_run(command)
+ systemd_services = services.services_list[service]["systemd_service"]
+ if not isinstance(systemd_services, list):
+ systemd_services = [systemd_services]
+
+ for service_name in systemd_services:
+ command = "sudo systemctl " + action + " " + service_name
+ logger.info(f"Executing: {command}")
+ await async_run(command)
elif runtime == "docker":
client = docker.from_env() # type: ignore
- logger.info(f'Executing: {action} on {services.services_list[service]["docker_service"]}')
- try:
- container = client.containers.get(services.services_list[service]["docker_service"])
- container.reload()
- if action == "start":
- container.start()
- if action == "stop":
- container.stop()
- if action == "restart":
- container.restart()
- if action == "kill":
- container.kill()
- except (docker.errors.NotFound, docker.errors.APIError) as docker_error: # type: ignore
- logger.error(f"{docker_error}")
- pass
+ docker_services = services.services_list[service]["docker_service"]
+ if not isinstance(docker_services, list):
+ docker_services = [docker_services]
+ for service_name in docker_services:
+ logger.info(f'Executing: {action} on {service_name}')
+ try:
+ container = client.containers.get(service_name)
+ container.reload()
+ if action == "start":
+ container.start()
+ if action == "stop":
+ container.stop()
+ if action == "restart":
+ container.restart()
+ if action == "kill":
+ container.kill()
+ except (docker.errors.NotFound, docker.errors.APIError) as docker_error: # type: ignore
+ logger.error(f"{docker_error}")
+ pass
else:
# The Nomad mode currently does not support shutting down services
@@ -829,7 +905,7 @@ async def server_error(request, exc) -> Response:
app.mount("/static", StaticFiles(directory="webinterface/statics", check_dir=False), name="static")
app.add_middleware(AuthenticationMiddleware, backend=SessionAuthBackend())
app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY, session_cookie="mercure_session")
-app.mount("/rules", rules.rules_app)
+app.mount("/rules", rules.rules_app, name="rules")
app.mount("/targets", targets.targets_app)
app.mount("/modules", modules.modules_app)
app.mount("/users", users.users_app)
@@ -847,7 +923,7 @@ async def server_error(request, exc) -> Response:
async def emergency_response(request) -> Response:
"""Shows emergency message about invalid configuration."""
- return PlainTextResponse("ERROR: mercure configuration is invalid. Check configuration and restart webgui service.")
+ return PlainTextResponse("ERROR: mercure configuration is invalid. Check configuration and restart webgui service.", status_code=500)
def launch_emergency_app() -> None:
@@ -876,8 +952,11 @@ def main(args=sys.argv[1:]) -> None:
logging.getLogger("watchdog").setLevel(logging.WARNING)
try:
services.read_services()
- config.read_config()
+ config_ = config.read_config()
users.read_users()
+ success, error = helper.validate_folders(config_)
+ if not success:
+ raise ValueError(f"Invalid configuration folder structure: {error}")
if str(SECRET_KEY) == "PutSomethingRandomHere":
logger.error("You need to change the SECRET_KEY in configuration/webgui.env")
raise Exception("Invalid or missing SECRET_KEY in webgui.env")
diff --git a/webinterface/common.py b/webinterface/common.py
index 29751aff..deca2a2e 100755
--- a/webinterface/common.py
+++ b/webinterface/common.py
@@ -5,16 +5,23 @@
"""
# Standard python includes
+import os
from typing import Optional, Tuple
import asyncio
+from redis import Redis
+from rq import Queue
# Starlette-related includes
from starlette.templating import Jinja2Templates
+from common.constants import mercure_defs
+from rq_scheduler import Scheduler
import common.config as config
-templates = Jinja2Templates(directory="webinterface/templates")
-
+redis = Redis.from_url(os.getenv("REDIS_URL","redis://localhost:6379/0"))
+rq_slow_queue = Queue(name="mercure_slow", connection=redis)
+rq_fast_queue = Queue(name="mercure_fast", connection=redis)
+rq_fast_scheduler = Scheduler(queue=rq_fast_queue, connection=rq_fast_queue.connection)
def get_user_information(request) -> dict:
"""Returns dictionary of values that should always be passed to the templates when the user is logged in."""
@@ -26,6 +33,11 @@ def get_user_information(request) -> dict:
"appliance_color": config.mercure.appliance_color,
}
+def get_mercure_version(request) -> dict:
+ return { "mercure_version": mercure_defs.VERSION }
+
+templates = Jinja2Templates(directory="webinterface/templates", context_processors=[get_user_information, get_mercure_version])
+
async def async_run(cmd, **params) -> Tuple[Optional[int], bytes, bytes]:
"""Executes the given command in a way compatible with ayncio."""
diff --git a/webinterface/dashboards.py b/webinterface/dashboards.py
deleted file mode 100644
index 8afc4275..00000000
--- a/webinterface/dashboards.py
+++ /dev/null
@@ -1,61 +0,0 @@
-"""
-test.py
-========
-Test page for querying the bookkeeper database from the webgui.
-"""
-
-# Standard python includes
-import daiquiri
-
-# Starlette-related includes
-from starlette.applications import Starlette
-from starlette.authentication import requires
-
-# App-specific includes
-from common.constants import mercure_defs
-from webinterface.common import get_user_information
-from webinterface.common import templates
-import common.config as config
-from starlette.responses import RedirectResponse
-from decoRouter import Router as decoRouter
-router = decoRouter()
-
-logger = config.get_logger()
-
-###################################################################################
-## Test endpoints
-###################################################################################
-
-@router.get("/")
-async def index(request):
- return RedirectResponse(url="tests")
-
-
-@router.get("/tasks")
-@requires("authenticated", redirect="login")
-async def tasks(request):
- template = "dashboards/tasks.html"
- context = {
- "request": request,
- "mercure_version": mercure_defs.VERSION,
- "page": "tools",
- "tab": "tasks",
- }
- context.update(get_user_information(request))
- return templates.TemplateResponse(template, context)
-
-
-@router.get("/tests")
-@requires(["authenticated", "admin"], redirect="login")
-async def tests(request):
- template = "dashboards/tests.html"
- context = {
- "request": request,
- "mercure_version": mercure_defs.VERSION,
- "page": "tools",
- "tab": "tests",
- }
- context.update(get_user_information(request))
- return templates.TemplateResponse(template, context)
-
-dashboards_app = Starlette(routes=router)
\ No newline at end of file
diff --git a/webinterface/dashboards/__init__.py b/webinterface/dashboards/__init__.py
new file mode 100644
index 00000000..00f320b6
--- /dev/null
+++ b/webinterface/dashboards/__init__.py
@@ -0,0 +1,7 @@
+from .common import router
+from . import query_routes, simple
+from starlette.applications import Starlette
+
+
+
+dashboards_app = Starlette(routes=router)
\ No newline at end of file
diff --git a/webinterface/dashboards/common.py b/webinterface/dashboards/common.py
new file mode 100644
index 00000000..09dc9068
--- /dev/null
+++ b/webinterface/dashboards/common.py
@@ -0,0 +1,12 @@
+from decoRouter import Router as decoRouter
+from starlette.responses import RedirectResponse, JSONResponse
+
+router = decoRouter()
+
+@router.get("/")
+async def index(request):
+ return RedirectResponse(url="query")
+
+class JSONErrorResponse(JSONResponse):
+ def __init__(self, message: str, status_code: int = 500):
+ super().__init__(content={"error": message}, status_code=status_code)
\ No newline at end of file
diff --git a/webinterface/dashboards/query/__init__.py b/webinterface/dashboards/query/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/webinterface/dashboards/query/jobs.py b/webinterface/dashboards/query/jobs.py
new file mode 100644
index 00000000..d6b6697f
--- /dev/null
+++ b/webinterface/dashboards/query/jobs.py
@@ -0,0 +1,456 @@
+
+
+from dataclasses import dataclass
+import dataclasses
+import os
+from pathlib import Path
+import shutil
+from typing import Any, Dict, Generator, List, Optional, Tuple, Type, Union, cast
+import typing
+
+
+from common import helper
+from common.types import DicomTarget, DicomWebTarget, FolderTarget
+from dispatch.target_types.base import ProgressInfo
+from dispatch.target_types.registry import get_handler
+# Standard python includes
+from datetime import datetime
+import time
+# Starlette-related includes
+
+# App-specific includes
+import common.config as config
+from webinterface.common import redis, rq_fast_queue, rq_slow_queue
+from rq.job import Dependency, JobStatus, Job
+from rq import Connection, Queue, get_current_job
+
+logger = config.get_logger()
+
+
+
+def query_dummy(job_id, job_kwargs):
+ """
+ Dummy function to simulate a long-running task.
+ """
+ total_time = 2 # Total time for the job in seconds (1 minute)
+ update_interval = 0.25 # Interval between updates in seconds
+ remaining = total_time // update_interval
+ completed = 0
+ start_time = time.monotonic()
+
+ while (time.monotonic() - start_time) < total_time:
+ time.sleep(update_interval) # Sleep for the interval duration
+ out_file = (Path(job_kwargs['path']) / f"dummy{completed}_{job_id}.dcm")
+ if out_file.exists():
+ raise Exception(f"{out_file} exists already")
+ out_file.touch()
+ remaining -= 1
+ completed += 1
+
+ yield completed, remaining, f"{completed} / {remaining + completed}"
+
+
+@dataclass
+class ClassBasedRQTask():
+ parent: Optional[str] = None
+ type: str = "unknown"
+ _job: Optional[Job] = None
+ _queue: str = ''
+
+ @classmethod
+ def queue(cls) -> Queue:
+ return Queue(cls._queue, connection=redis)
+
+ def create_job(self, rq_options={}, **kwargs) -> Job:
+ fields = dataclasses.fields(self)
+ meta = {field.name: getattr(self, field.name) for field in fields}
+ return Job.create(self._execute, kwargs=kwargs, meta=meta, **rq_options)
+
+ @classmethod
+ def _execute(cls, **kwargs) -> Any:
+ job = get_current_job()
+ if not job:
+ raise Exception("No current job")
+ fields = dataclasses.fields(cls)
+ meta = {}
+ for f in fields:
+ if f.name in job.meta and not f.name.startswith('_'):
+ meta[f.name] = job.meta[f.name]
+ result = cls(**meta, _job=job).execute(**kwargs)
+ if result is None:
+ return b""
+ return result
+
+ def execute(self, *args, **kwargs) -> Any:
+ pass
+
+ @staticmethod
+ def move_to_destination(path, destination, job_id) -> None:
+ if destination is None:
+ config.read_config()
+ for p in Path(path).glob("**/*"):
+ if p.is_file():
+ shutil.move(str(p), config.mercure.incoming_folder) # Move the file to incoming folder
+ # tree(config.mercure.incoming_folder)
+ shutil.rmtree(path)
+ else:
+ dest_folder: Path = Path(destination) / job_id
+ dest_folder.mkdir(exist_ok=True)
+ logger.info(f"moving {path} to {dest_folder}")
+ shutil.move(path, dest_folder)
+
+
+@dataclass
+class CheckAccessionsTask(ClassBasedRQTask):
+ type: str = "check_accessions"
+ _queue: str = rq_fast_queue.name
+
+ def execute(self, *, accessions: List[str], node: Union[DicomTarget, DicomWebTarget], search_filters:Dict[str,List[str]]={}):
+ """
+ Check if the given accessions exist on the node using a DICOM query.
+ """
+ results = []
+ try:
+ for accession in accessions:
+ found_ds_list = get_handler(node).find_from_target(node, accession, search_filters)
+ if not found_ds_list:
+ raise ValueError("No series found with accession number {}".format(accession))
+ results.extend(found_ds_list)
+ return results
+ except Exception as e:
+ if not self._job:
+ raise
+ self._job.meta['failed_reason'] = str(e)
+ self._job.save_meta() # type: ignore
+ if self.parent and (job_parent := Job.fetch(self.parent)):
+ job_parent.meta['failed_reason'] = e.args[0]
+ job_parent.save_meta() # type: ignore
+ Queue(job_parent.origin)._enqueue_job(job_parent,at_front=True)
+ raise
+
+
+@dataclass
+class GetAccessionTask(ClassBasedRQTask):
+ type: str = "get_accession"
+ paused: bool = False
+ offpeak: bool = False
+ _queue: str = rq_slow_queue.name
+
+ @classmethod
+ def get_accession(cls, job_id, accession: str, node: Union[DicomTarget, DicomWebTarget], search_filters: Dict[str, List[str]], path) -> Generator[ProgressInfo, None, None]:
+ yield from get_handler(node).get_from_target(node, accession, search_filters, path)
+
+ def execute(self, *, accession:str, node: Union[DicomTarget, DicomWebTarget], search_filters:Dict[str, List[str]], path: str):
+ print(f"Getting {accession}")
+ def error_handler(reason) -> None:
+ logger.error(reason)
+ if not job_parent:
+ raise
+ logger.info("Cancelling sibling jobs.")
+ for subjob_id in job_parent.kwargs.get('subjobs',[]):
+ if subjob_id == job.id:
+ continue
+ subjob = Job.fetch(subjob_id)
+ if subjob.get_status() not in ('finished', 'canceled','failed'):
+ subjob.cancel()
+ job_parent.get_meta()
+ logger.info("Cancelled sibling jobs.")
+ if not job_parent.meta.get("failed_reason"):
+ job_parent.meta["failed_reason"] = reason
+ job_parent.save_meta() # type: ignore
+ Queue(job_parent.origin)._enqueue_job(job_parent,at_front=True) # Force the parent job to run and fail itself
+
+ job = cast(Job,self._job)
+ try:
+ Path(path).mkdir(parents=True, exist_ok=True)
+ job_parent = None
+ if parent_id := self.parent:
+ job_parent = Job.fetch(parent_id)
+
+ if job_parent:
+ job_parent.meta['started'] = job_parent.meta.get('started',0) + 1
+ job_parent.save_meta() # type: ignore
+
+ job.meta['started'] = 1
+ job.meta['progress'] = "0 / Unknown"
+ job.save_meta() # type: ignore
+ try:
+ for info in self.get_accession(job.id, accession=accession, node=node, search_filters=search_filters, path=path):
+ job.meta['remaining'] = info.remaining
+ job.meta['completed'] = info.completed
+ job.meta['progress'] = info.progress
+ job.save_meta() # type: ignore # Save the updated meta data to the job
+ logger.info(info.progress)
+ except Exception as e:
+ error_handler(f"Failure during retrieval of accession {accession}: {e}")
+ raise
+ if job_parent:
+ if job_parent.kwargs["move_promptly"]:
+ try:
+ self.move_to_destination(path, job_parent.kwargs["destination"], job_parent.id)
+ except Exception as e:
+ error_handler(f"Failure during move to destination of accession {accession}: {e}")
+ raise
+
+ job_parent.get_meta() # there is technically a race condition here...
+ job_parent.meta['completed'] += 1
+ job_parent.meta['progress'] = f"{job_parent.meta['started'] } / {job_parent.meta['completed'] } / {job_parent.meta['total']}"
+ job_parent.save_meta() # type: ignore
+
+ except Exception as e:
+ error_handler(f"Failure with accession {accession}: {e}")
+ raise
+
+ return "Job complete"
+
+@dataclass
+class MainTask(ClassBasedRQTask):
+ type: str = "batch"
+ started: int = 0
+ completed: int = 0
+ total: int = 0
+ paused: bool = False
+ offpeak: bool = False
+ _queue: str = rq_slow_queue.name
+
+ def execute(self, *, accessions, subjobs, path, destination, move_promptly) -> str:
+ job = cast(Job,self._job)
+ job.get_meta()
+ for job_id in job.kwargs.get('subjobs',[]):
+ subjob = Job.fetch(job_id)
+ if (status := subjob.get_status()) != 'finished':
+ raise Exception(f"Subjob {subjob.id} is {status}")
+ if job.kwargs.get('failed', False):
+ raise Exception(f"Failed")
+
+ logger.info(f"Job completing {job.id}")
+ if not move_promptly:
+ logger.info("Moving files during completion as move_promptly==False")
+ for p in Path(path).iterdir():
+ if not p.is_dir():
+ continue
+ try:
+ self.move_to_destination(p, destination, job.id)
+ except Exception as e:
+ err = f"Failure during move to destination {destination}: {e}" if destination else f"Failure during move to {config.mercure.incoming_folder}: {e}"
+ logger.error(err)
+ job.meta["failed_reason"] = err
+ job.save_meta() # type: ignore
+ raise
+ # subprocess.run(["./bin/ubuntu22.04/getdcmtags", filename, self.called_aet, "MERCURE"],check=True)
+
+ logger.info(f"Removing job directory {path}")
+ shutil.rmtree(path)
+ job.meta["failed_reason"] = None
+ job.save_meta() # type: ignore
+
+ return "Job complete"
+
+class QueryPipeline():
+ job: Job
+ def __init__(self, job: Union[Job,str]):
+ if isinstance(job, str):
+ if not (result:=Job.fetch(job)):
+ raise Exception("Invalid Job ID")
+ self.job = result
+ else:
+ self.job = job
+
+ assert self.job.meta.get('type') == 'batch', f"Job type must be batch, got {self.job.meta['type']}"
+
+ @classmethod
+ def create(cls, accessions: List[str], search_filters:Dict[str, List[str]], dicom_node: Union[DicomWebTarget, DicomTarget], destination_path, offpeak=False) -> 'QueryPipeline':
+ """
+ Create a job to process the given accessions and store them in the specified destination path.
+ """
+
+ with Connection(redis):
+ get_accession_jobs: List[Job] = []
+ check_job = CheckAccessionsTask().create_job(accessions=accessions, search_filters=search_filters, node=dicom_node)
+ for accession in accessions:
+ get_accession_task = GetAccessionTask(offpeak=offpeak).create_job(
+ accession=str(accession),
+ node=dicom_node,
+ search_filters=search_filters,
+ rq_options=dict(
+ depends_on=cast(List[Union[Dependency, Job]],[check_job]),
+ timeout=30*60,
+ result_ttl=-1
+ )
+ )
+ get_accession_jobs.append(get_accession_task)
+ depends = Dependency(
+ jobs=cast(List[Union[Job,str]],get_accession_jobs),
+ allow_failure=True, # allow_failure defaults to False
+ )
+ main_job = MainTask(total=len(get_accession_jobs), offpeak=offpeak).create_job(
+ accessions = accessions,
+ subjobs = [check_job.id]+[j.id for j in get_accession_jobs],
+ destination = destination_path,
+ move_promptly = True,
+ rq_options = dict(depends_on=depends, timeout=-1, result_ttl=-1)
+ )
+ check_job.meta["parent"] = main_job.id
+ for j in get_accession_jobs:
+ j.meta["parent"] = main_job.id
+ j.kwargs["path"] = Path(config.mercure.jobs_folder) / str(main_job.id) / j.kwargs['accession']
+ j.kwargs["path"].mkdir(parents=True)
+
+ main_job.kwargs["path"] = Path(config.mercure.jobs_folder) / str(main_job.id)
+
+ CheckAccessionsTask.queue().enqueue_job(check_job)
+ for j in get_accession_jobs:
+ GetAccessionTask.queue().enqueue_job(j)
+ MainTask.queue().enqueue_job(main_job)
+
+ wrapped_job = cls(main_job)
+ if offpeak and not helper._is_offpeak(config.mercure.offpeak_start, config.mercure.offpeak_end, datetime.now().time()):
+ wrapped_job.pause()
+
+ return wrapped_job
+
+ def __bool__(self) -> bool:
+ return bool(self.job)
+
+ def pause(self) -> None:
+ """
+ Pause the current job, including all its subjobs.
+ """
+ for job_id in self.job.kwargs.get('subjobs',[]):
+ subjob = Job.fetch(job_id)
+ if subjob and (subjob.is_deferred or subjob.is_queued):
+ logger.debug(f"Pausing {subjob}")
+ subjob.meta['paused'] = True
+ subjob.save_meta() # type: ignore
+ subjob.cancel()
+ self.job.get_meta()
+ self.job.meta['paused'] = True
+ self.job.save_meta() # type: ignore
+
+ def resume(self) -> None:
+ """
+ Resume a paused job by unpausing all its subjobs
+ """
+ for subjob_id in self.job.kwargs.get('subjobs',[]):
+ subjob = Job.fetch(subjob_id)
+ if subjob and subjob.meta.get('paused', None):
+ subjob.meta['paused'] = False
+ subjob.save_meta() # type: ignore
+ Queue(subjob.origin).canceled_job_registry.requeue(subjob_id)
+ self.job.get_meta()
+ self.job.meta['paused'] = False
+ self.job.save_meta() # type: ignore
+
+ def retry(self) -> None:
+ """
+ Retry a failed job by enqueuing it again
+ """
+ # job.meta["retries"] = job.meta.get("retries", 0) + 1
+ # if job.meta["retries"] > 3:
+ # return False
+ logger.info(f"Retrying {self.job}")
+ for subjob in self.get_subjobs():
+ meta = subjob.get_meta()
+ if (status:=subjob.get_status()) in ("failed", "canceled"):
+ logger.info(f"Retrying {subjob} ({status}) {meta}")
+ if status == "failed" and (job_path:=Path(subjob.kwargs['path'])).exists():
+ shutil.rmtree(job_path) # Clean up after a failed job
+ Queue(subjob.origin).enqueue_job(subjob)
+ Queue(self.job.origin).enqueue_job(self.job)
+
+ @classmethod
+ def update_all_offpeak(cls) -> None:
+ """
+ Resume or pause offpeak jobs based on whether the current time is within offpeak hours.
+ """
+ config.read_config()
+ is_offpeak = helper._is_offpeak(config.mercure.offpeak_start, config.mercure.offpeak_end, datetime.now().time())
+ for pipeline in QueryPipeline.get_all():
+ pipeline.update_offpeak(is_offpeak)
+
+ def update_offpeak(self, is_offpeak) -> None:
+ if not self.meta.get("offpeak"):
+ return
+ if self.get_status() not in ("waiting", "running", "queued", "deferred"):
+ return
+
+ if is_offpeak:
+ # logger.info(f"{job.meta}, {job.get_status()}")
+ if self.is_paused:
+ logger.info("Resuming")
+ self.resume()
+ else:
+ if not self.is_paused:
+ logger.info("Pausing")
+ self.pause()
+
+ def get_subjobs(self) -> Generator[Job, None, None]:
+ return (j for j in (Queue(self.job.origin).fetch_job(job) for job in self.job.kwargs.get('subjobs', [])) if j is not None)
+
+ def get_status(self) -> JobStatus:
+ return cast(JobStatus,self.job.get_status())
+
+ def get_meta(self) -> Any:
+ return cast(dict,self.job.get_meta())
+
+ @property
+ def meta(self) -> typing.Dict:
+ return cast(dict, self.job.meta)
+
+ @property
+ def is_failed(self) -> bool:
+ return cast(bool,self.job.is_failed)
+
+ @property
+ def is_finished(self) -> bool:
+ return cast(bool,self.job.is_finished)
+
+ @property
+ def is_paused(self) -> bool:
+ return cast(bool,self.meta.get("paused",False))
+
+ @property
+ def id(self) -> str:
+ return cast(str,self.job.id)
+
+ @property
+ def kwargs(self) -> typing.Dict:
+ return cast(dict,self.job.kwargs)
+
+ @property
+ def result(self) -> Any:
+ return self.job.result
+
+ @property
+ def created_at(self) -> datetime:
+ return cast(datetime,self.job.created_at)
+
+ @property
+ def enqueued_at(self) -> datetime:
+ return cast(datetime,self.job.enqueued_at)
+
+ @classmethod
+ def get_all(cls, type:str="batch") -> Generator['QueryPipeline', None, None]:
+ """
+ Get all jobs of a given type from the queue
+ """
+ job_ids = set()
+
+ registries = [
+ rq_slow_queue.started_job_registry, # Returns StartedJobRegistry
+ rq_slow_queue.deferred_job_registry, # Returns DeferredJobRegistry
+ rq_slow_queue.finished_job_registry, # Returns FinishedJobRegistry
+ rq_slow_queue.failed_job_registry, # Returns FailedJobRegistry
+ rq_slow_queue.scheduled_job_registry, # Returns ScheduledJobRegistry
+ rq_slow_queue.canceled_job_registry, # Returns CanceledJobRegistry
+ ]
+ for registry in registries:
+ for j_id in registry.get_job_ids():
+ job_ids.add(j_id)
+
+ for j_id in rq_slow_queue.job_ids:
+ job_ids.add(j_id)
+
+ jobs = (Job.fetch(j_id) for j_id in job_ids)
+
+ return (QueryPipeline(j) for j in jobs if j and j.get_meta().get("type") == type)
diff --git a/webinterface/dashboards/query_routes.py b/webinterface/dashboards/query_routes.py
new file mode 100644
index 00000000..3e78cb39
--- /dev/null
+++ b/webinterface/dashboards/query_routes.py
@@ -0,0 +1,272 @@
+
+from time import sleep
+from typing import Any, Dict, List
+from datetime import datetime
+
+# Starlette-related includes
+from starlette.authentication import requires
+from starlette.responses import JSONResponse
+
+from rq.job import Job
+from rq import Connection
+
+# App-specific includes
+from webinterface.common import templates
+import common.config as config
+from common.types import DicomTarget, DicomWebTarget, FolderTarget
+
+from webinterface.common import redis
+from .common import router, JSONErrorResponse
+from .query.jobs import CheckAccessionsTask, QueryPipeline
+
+logger = config.get_logger()
+
+@router.post("/query/retry_job")
+@requires(["authenticated", "admin"], redirect="login")
+async def post_retry_job(request):
+ with Connection(redis):
+ job = QueryPipeline(request.query_params['id'])
+
+ if not job:
+ return JSONErrorResponse(f"Job with id {request.query_params['id']} not found.", status_code=404)
+
+ try:
+ job.retry()
+ except Exception as e:
+ logger.exception("Failed to retry job", exc_info=True)
+ return JSONErrorResponse("Failed to retry job",status_code=500)
+ return JSONResponse({})
+
+@router.post("/query/pause_job")
+@requires(["authenticated", "admin"], redirect="login")
+async def post_pause_job(request):
+ with Connection(redis):
+ job = QueryPipeline(request.query_params['id'])
+
+ if not job:
+ return JSONErrorResponse('Job not found', status_code=404)
+ if job.is_finished or job.is_failed:
+ return JSONErrorResponse('Job is already finished', status_code=400)
+
+ try:
+ job.pause()
+ except Exception as e:
+ logger.exception(f"Failed to pause job {request.query_params['id']}")
+ return JSONErrorResponse('Failed to pause job', status_code=500)
+ return JSONResponse({'status': 'success'}, status_code=200)
+
+@router.post("/query/resume_job")
+@requires(["authenticated", "admin"], redirect="login")
+async def post_resume_job(request):
+ with Connection(redis):
+ job = QueryPipeline(request.query_params['id'])
+ if not job:
+ return JSONErrorResponse('Job not found', status_code=404)
+ if job.is_finished or job.is_failed:
+ return JSONErrorResponse('Job is already finished', status_code=400)
+
+ try:
+ job.resume()
+ except Exception as e:
+ logger.exception(f"Failed to resume job {request.query_params['id']}")
+ return JSONErrorResponse('Failed to resume job', status_code=500)
+ return JSONResponse({'status': 'success'}, status_code=200)
+
+@router.get("/query/job_info")
+@requires(["authenticated", "admin"], redirect="login")
+async def get_job_info(request):
+ job_id = request.query_params['id']
+ with Connection(redis):
+ job = QueryPipeline(job_id)
+ if not job:
+ return JSONErrorResponse('Job not found', status_code=404)
+
+ subjob_info:List[Dict[str,Any]] = []
+ for subjob in job.get_subjobs():
+ if not subjob:
+ continue
+ if subjob.meta.get('type') != 'get_accession':
+ continue
+ info = {
+ 'id': subjob.get_id(),
+ 'ended_at': subjob.ended_at.isoformat().split('.')[0] if subjob.ended_at else "",
+ 'created_at_dt':subjob.created_at,
+ 'accession': subjob.kwargs['accession'],
+ 'progress': subjob.meta.get('progress'),
+ 'paused': subjob.meta.get('paused',False),
+ 'status': subjob.get_status()
+ }
+ if info['status'] == 'canceled' and info['paused']:
+ info['status'] = 'paused'
+ subjob_info.append(info)
+
+ subjob_info = sorted(subjob_info, key=lambda x:x['created_at_dt'])
+
+ # generate a bunch of dummy data for testing purposes
+
+ return templates.TemplateResponse("dashboards/query_job_fragment.html", {"request":request,"job":job,"subjob_info":subjob_info})
+
+
+
+@router.post("/query")
+@requires(["authenticated", "admin"], redirect="login")
+async def query_post_batch(request):
+ """
+ Starts a new query job for the given accession number and DICOM node.
+ """
+ try:
+ form = await request.form()
+ except Exception as e:
+ return JSONErrorResponse("Invalid form data.", status_code=400)
+ accession = form.get("accession")
+ if not accession:
+ return JSONErrorResponse("Accession number is required.", status_code=400)
+
+ node = config.mercure.targets.get(form.get("dicom_node"))
+ if not node:
+ return JSONErrorResponse(f"No such DICOM node {form.get('dicom_node')}.", status_code=404)
+ if not isinstance(node, (DicomWebTarget, DicomTarget)):
+ return JSONErrorResponse(f"Invalid DICOM node {form.get('dicom_node')}.", status_code=400)
+
+ destination_name = form.get("destination")
+ if not destination_name:
+ destination_path = None
+ else:
+ destination = config.mercure.targets.get(destination_name)
+ if not isinstance(destination, FolderTarget):
+ return JSONErrorResponse(f"Invalid destination '{destination_name}': not a folder target.", status_code=400)
+ if not destination:
+ return JSONErrorResponse(f"No such target '{destination_name}'.", status_code=400)
+ destination_path = destination.folder
+
+ offpeak = 'offpeak' in form
+ search_filters = {}
+ if search_filter := form.get("series_description"):
+ search_filters["SeriesDescription"] = [x.strip() for x in search_filter.split(",")]
+ if search_filter := form.get("study_description"):
+ search_filters["StudyDescription"] = [x.strip() for x in search_filter.split(",")]
+
+ try:
+ QueryPipeline.create(accession.split(","), search_filters, node, destination_path, offpeak=offpeak)
+ except Exception as e:
+ logger.exception(f"Error creating query pipeline for accession {accession}.")
+ return JSONErrorResponse(str(e))
+
+ return JSONResponse({"status": "success"})
+
+@router.get("/query/jobs")
+@requires(["authenticated", "admin"], redirect="login")
+async def query_jobs(request):
+ """
+ Returns a list of all query jobs.
+ """
+ tasks_info = []
+ try:
+ with Connection(redis):
+ query_tasks = list(QueryPipeline.get_all())
+ except Exception as e:
+ logger.exception("Error retrieving query tasks.")
+ return JSONErrorResponse("Error retrieving query tasks.", status_code=500)
+
+ for task in query_tasks:
+ task_dict: Dict[str,Any] = dict(id=task.id,
+ status=task.get_status(),
+ parameters=dict(accession=task.kwargs.get('accession','')),
+ created_at=1000*datetime.timestamp(task.created_at) if task.created_at else "",
+ enqueued_at=1000*datetime.timestamp(task.enqueued_at) if task.enqueued_at else "",
+ result=task.result if task.get_status() != "failed" else task.meta.get("failed_reason",""),
+ meta=task.meta,
+ progress="")
+ # if job.meta.get('completed') and job.meta.get('remaining'):
+ # task_dict["progress"] = f"{job.meta.get('completed')} / {job.meta.get('completed') + job.meta.get('remaining')}"
+ # if job.meta.get('type',None) == "batch":
+ n_started = task.meta.get('started',0)
+ n_completed = task.meta.get('completed',0)
+ n_total = task.meta.get('total',0)
+
+ if task_dict["status"] == "finished":
+ task_dict["progress"] = f"{n_total} / {n_total}"
+ elif task_dict["status"] in ("deferred","started", "paused", "canceled"):
+ task_dict["progress"] = f"{n_completed} / {n_total}"
+
+ # if task_dict["status"] == "canceled" and
+ if task.meta.get('paused', False) and task_dict["status"] not in ("finished", "failed"):
+ if n_started < n_completed: # TODO: this does not work
+ task_dict["status"] = "pausing"
+ else:
+ task_dict["status"] = "paused"
+
+ if task_dict["status"] in ("deferred", "started"):
+ if n_started == 0:
+ task_dict["status"] = "waiting"
+ elif n_completed < n_total:
+ task_dict["status"] = "running"
+ elif n_completed == n_total:
+ task_dict["status"] = "finishing"
+
+ tasks_info.append(task_dict)
+ return JSONResponse(dict(data=tasks_info))
+
+@router.get("/query")
+@requires(["authenticated", "admin"], redirect="login")
+async def query(request):
+ template = "dashboards/query.html"
+ dicom_nodes = [name for name,node in config.mercure.targets.items() if isinstance(node, (DicomTarget, DicomWebTarget)) and node.direction in ("pull", "both")]
+ destination_folders = [name for name,node in config.mercure.targets.items() if isinstance(node, FolderTarget)]
+ context = {
+ "request": request,
+ "destination_folders": destination_folders,
+ "dicom_nodes": dicom_nodes,
+ "page": "tools",
+ "tab": "query"
+ }
+ return templates.TemplateResponse(template, context)
+
+@router.post("/query/check_accessions")
+@requires(["authenticated", "admin"], redirect="login")
+async def check_accessions(request):
+ form = await request.form()
+ job_id = form.get("job_id")
+
+ if job_id:
+ # Retrieve results for an existing job
+ job = Job.fetch(job_id, redis)
+ if not job:
+ return JSONResponse({"error": "Job not found"}, status_code=404)
+ elif job.is_failed:
+ job.get_meta()
+ logger.warning(job.meta)
+ if failed_reason:=job.meta.get("failed_reason"):
+ return JSONResponse({"status": "failed", "info": failed_reason})
+ else:
+ return JSONResponse({"status": "failed", "info": "Unknown error"})
+ elif job.is_finished:
+ result_data = []
+ for d in job.result:
+ logger.info(d)
+ result_data.append( {x:d.get(x) for x in ["AccessionNumber", "PatientID", "StudyInstanceUID", "SeriesInstanceUID", "StudyDescription", "SeriesDescription", "NumberOfSeriesRelatedInstances"]} )
+ return JSONResponse({"status": "completed", "result": result_data})
+ return JSONResponse({"status": "pending", "job_id": job.id})
+
+ node_name = form.get("dicom_node")
+ accessions = form.get("accessions", "").split(",")
+
+ search_filters = {}
+ if search_filter:= form.get("series_description"):
+ search_filters["SeriesDescription"] = [x.strip() for x in search_filter.split(",")]
+ if search_filter:= form.get("study_description"):
+ search_filters["StudyDescription"] = [x.strip() for x in search_filter.split(",")]
+
+ node = config.mercure.targets.get(node_name)
+ if not isinstance(node, (DicomWebTarget, DicomTarget)):
+ return JSONErrorResponse(f"Invalid DICOM node '{node_name}'.", status_code=400)
+
+ try:
+ with Connection(redis):
+ job = CheckAccessionsTask().create_job(accessions=accessions, node=node, search_filters=search_filters)
+ CheckAccessionsTask.queue().enqueue_job(job)
+ except Exception as e:
+ logger.exception("Error during accessions check task creation")
+ return JSONErrorResponse(str(e), status_code=500)
+
+ return JSONResponse({"status": "pending", "job_id": job.id})
\ No newline at end of file
diff --git a/webinterface/dashboards/simple.py b/webinterface/dashboards/simple.py
new file mode 100644
index 00000000..ee29a2ac
--- /dev/null
+++ b/webinterface/dashboards/simple.py
@@ -0,0 +1,32 @@
+from .common import router
+from starlette.authentication import requires
+
+# App-specific includes
+from common.constants import mercure_defs
+from webinterface.common import get_user_information, templates
+import common.config as config
+logger = config.get_logger()
+from .common import router
+
+@router.get("/tasks")
+@requires("authenticated", redirect="login")
+async def tasks(request):
+ template = "dashboards/tasks.html"
+ context = {
+ "request": request,
+ "page": "tools",
+ "tab": "tasks",
+ }
+ return templates.TemplateResponse(template, context)
+
+
+@router.get("/tests")
+@requires(["authenticated", "admin"], redirect="login")
+async def tests(request):
+ template = "dashboards/tests.html"
+ context = {
+ "request": request,
+ "page": "tools",
+ "tab": "tests",
+ }
+ return templates.TemplateResponse(template, context)
\ No newline at end of file
diff --git a/webinterface/dicom_client.py b/webinterface/dicom_client.py
new file mode 100644
index 00000000..c2d7e9ed
--- /dev/null
+++ b/webinterface/dicom_client.py
@@ -0,0 +1,248 @@
+import os
+import re
+from typing import Any, Dict, Iterator, List, Optional, Sequence, cast
+from pynetdicom import (
+ AE,
+ QueryRetrievePresentationContexts, BasicWorklistManagementPresentationContexts, UnifiedProcedurePresentationContexts,
+ build_role,
+ evt,
+ StoragePresentationContexts
+ )
+from pynetdicom.sop_class import StudyRootQueryRetrieveInformationModelFind # type: ignore
+from pynetdicom.apps.common import create_dataset
+from pynetdicom._globals import DEFAULT_MAX_LENGTH
+from pynetdicom.pdu_primitives import SOPClassExtendedNegotiation
+from pynetdicom.sop_class import ( # type: ignore
+ PatientRootQueryRetrieveInformationModelGet,
+ StudyRootQueryRetrieveInformationModelGet,
+ PatientStudyOnlyQueryRetrieveInformationModelGet,
+ EncapsulatedSTLStorage,
+ EncapsulatedOBJStorage,
+ EncapsulatedMTLStorage,
+)
+from pydicom.uid import DeflatedExplicitVRLittleEndian
+from pydicom import Dataset
+import sys
+import subprocess
+
+class DicomClientCouldNotAssociate(Exception):
+ pass
+
+class DicomClientCouldNotFind(Exception):
+ pass
+
+class DicomClientBadStatus(Exception):
+ pass
+
+SOP_CLASS_PREFIXES = {
+ "1.2.840.10008.5.1.4.1.1.2": ("CT", "CT Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.2.1": ("CTE", "Enhanced CT Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.4": ("MR", "MR Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.4.1": ("MRE", "Enhanced MR Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.128": ("PT", "Positron Emission Tomography Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.130": ("PTE", "Enhanced PET Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.481.1": ("RI", "RT Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.481.2": ("RD", "RT Dose Storage"),
+ "1.2.840.10008.5.1.4.1.1.481.5": ("RP", "RT Plan Storage"),
+ "1.2.840.10008.5.1.4.1.1.481.3": ("RS", "RT Structure Set Storage"),
+ "1.2.840.10008.5.1.4.1.1.1": ("CR", "Computed Radiography Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.6.1": ("US", "Ultrasound Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.6.2": ("USE", "Enhanced US Volume Storage"),
+ "1.2.840.10008.5.1.4.1.1.12.1": ("XA", "X-Ray Angiographic Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.12.1.1": ("XAE", "Enhanced XA Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.20": ("NM", "Nuclear Medicine Image Storage"),
+ "1.2.840.10008.5.1.4.1.1.7": ("SC", "Secondary Capture Image Storage"),
+}
+class SimpleDicomClient():
+ host: str
+ port: int
+ called_aet: str
+ output_dir: str
+ def __init__(self, host, port, called_aet, out_dir) -> None:
+ self.host = host
+ self.port = int(port)
+ self.called_aet = called_aet
+ self.output_dir = out_dir
+
+ def handle_store(self, event):
+ try:
+ ds = event.dataset
+ # Remove any Group 0x0002 elements that may have been included
+ ds = ds[0x00030000:]
+ except Exception as exc:
+ print(exc)
+ return 0x210
+ try:
+ sop_class = ds.SOPClassUID
+ # sanitize filename by replacing all illegal characters with underscores
+ sop_instance = re.sub(r"[^\d.]", "_", ds.SOPInstanceUID)
+ except Exception as exc:
+ print(
+ "Unable to decode the received dataset or missing 'SOP Class "
+ "UID' and/or 'SOP Instance UID' elements"
+ )
+ print(exc)
+ # Unable to decode dataset
+ return 0xC210
+
+ try:
+ # Get the elements we need
+ mode_prefix = SOP_CLASS_PREFIXES[sop_class][0]
+ except KeyError:
+ mode_prefix = "UN"
+
+ filename = f"{self.output_dir}/{mode_prefix}.{sop_instance}.dcm"
+ print(f"Storing DICOM file: {filename}")
+
+ status_ds = Dataset()
+ status_ds.Status = 0x0000
+ try:
+ if event.context.transfer_syntax == DeflatedExplicitVRLittleEndian:
+ # Workaround for pydicom issue #1086
+ with open(filename, "wb") as f:
+ f.write(event.encoded_dataset())
+ else:
+ # We use `write_like_original=False` to ensure that a compliant
+ # File Meta Information Header is written
+ ds.save_as(filename, write_like_original=False)
+
+ status_ds.Status = 0x0000 # Success
+ except OSError as exc:
+ print("Could not write file to specified directory:")
+ print(f" {os.path.dirname(filename)}")
+ print(exc)
+ # Failed - Out of Resources - OSError
+ status_ds.Status = 0xA700
+ except Exception as exc:
+ print("Could not write file to specified directory:")
+ print(f" {os.path.dirname(filename)}")
+ print(exc)
+ # Failed - Out of Resources - Miscellaneous error
+ status_ds.Status = 0xA701
+
+ return status_ds
+
+
+ def getscu(self, accession_number: str, search_filters: Dict[str, List[str]]) -> Iterator[Dataset]:
+ # Exclude these SOP Classes
+ _exclusion = [
+ EncapsulatedSTLStorage,
+ EncapsulatedOBJStorage,
+ EncapsulatedMTLStorage,
+ ]
+ store_contexts = [
+ cx for cx in StoragePresentationContexts if cx.abstract_syntax not in _exclusion
+ ]
+ ae = AE(ae_title="MERCURE")
+ # Create application entity
+ # Binding to port 0 lets the OS pick an available port
+ ae.acse_timeout = 30
+ ae.dimse_timeout = 30
+ ae.network_timeout = 30
+ ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
+ ae.add_requested_context(StudyRootQueryRetrieveInformationModelGet)
+ ae.add_requested_context(PatientStudyOnlyQueryRetrieveInformationModelGet)
+ ext_neg = []
+ for cx in store_contexts:
+ if not cx.abstract_syntax:
+ raise ValueError(f"Abstract syntax must be specified for storage context {cx}")
+ ae.add_requested_context(cx.abstract_syntax)
+ # Add SCP/SCU Role Selection Negotiation to the extended negotiation
+ # We want to act as a Storage SCP
+ ext_neg.append(build_role(cx.abstract_syntax, scp_role=True))
+ query_model = StudyRootQueryRetrieveInformationModelGet
+ assoc = ae.associate(
+ self.host, self.port,
+ ae_title=self.called_aet,
+ ext_neg=ext_neg, # type: ignore
+ evt_handlers=[(evt.EVT_C_STORE, self.handle_store, [])],
+ max_pdu=0,
+ )
+ if not assoc.is_established:
+ raise DicomClientCouldNotAssociate()
+ # Send query
+
+ ds = Dataset()
+ ds.QueryRetrieveLevel = 'SERIES'
+ ds.AccessionNumber = accession_number
+ for key in search_filters:
+ setattr(ds, key, "\\".join(search_filters.get(key,[])))
+
+ responses = assoc.send_c_get(ds, query_model)
+ success = False
+ for status, rsp_identifier in responses:
+ # If `status.Status` is one of the 'Pending' statuses then
+ # `rsp_identifier` is the C-GET response's Identifier dataset
+ if not status:
+ raise DicomClientBadStatus()
+
+ if status.Status in [0xFF00, 0xFF01]:
+ yield status
+ success = True
+ if not success:
+ raise DicomClientCouldNotFind()
+
+ assoc.release()
+
+ def findscu(self,accession_number, search_filters={}) -> List[Dataset]:
+ # Create application entity
+ ae = AE(ae_title="MERCURE")
+
+ # Add a requested presentation context
+ # ae.add_requested_context(StudyRootQueryRetrieveInformationModelFind)
+ ae.requested_contexts = QueryRetrievePresentationContexts
+ # + BasicWorklistManagementPresentationContexts
+ # + UnifiedProcedurePresentationContexts )
+
+ # Associate with the peer AE
+ assoc = ae.associate(self.host, self.port, ae_title=self.called_aet, max_pdu=0, ext_neg=[])
+
+ ds = Dataset()
+ ds.QueryRetrieveLevel = 'SERIES'
+ ds.AccessionNumber = accession_number
+ ds.SeriesInstanceUID = ''
+ ds.StudyInstanceUID = ''
+ ds.Modality = ''
+ ds.NumberOfSeriesRelatedInstances = ''
+ for key in search_filters:
+ setattr(ds, key, "\\".join(search_filters.get(key,[])))
+
+ if not assoc.is_established:
+ raise DicomClientCouldNotAssociate()
+
+ try:
+ responses = assoc.send_c_find(
+ ds,
+ StudyRootQueryRetrieveInformationModelFind
+ )
+ results = []
+ for (status, identifier) in responses:
+ if not status:
+ print('Connection timed out, was aborted or received invalid response')
+ break
+
+ if status.Status in [0xFF00, 0xFF01] and identifier:
+ # print('C-FIND query status: 0x{0:04x}'.format(status.Status))
+ results.append(identifier)
+ # elif status.Status == 0x0000:
+ # print("Success")
+ # break
+ if not results:
+ raise DicomClientCouldNotFind()
+ return results
+ finally:
+ assoc.release()
+
+if __name__ == "__main__":
+ # Replace these variables with your actual values
+ remote_host = sys.argv[1]
+ remote_port = int(sys.argv[2])
+ calling_aet = sys.argv[3]
+ called_aet = sys.argv[4]
+ accession_number = sys.argv[5]
+
+ print(f"{remote_host=} {remote_port=} {calling_aet=} {called_aet=} {accession_number=}")
+ c = SimpleDicomClient(remote_host, remote_port, called_aet, "/tmp/test-move")
+ # study_uid = c.get_study_uid(accession_number)
+ # print(study_uid)
+ c.getscu(accession_number, {})
\ No newline at end of file
diff --git a/webinterface/modules.py b/webinterface/modules.py
index 34047fc8..fdf89644 100755
--- a/webinterface/modules.py
+++ b/webinterface/modules.py
@@ -101,12 +101,11 @@ async def show_modules(request):
template = "modules.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
+
"page": "modules",
"modules": config.mercure.modules,
"used_modules": used_modules,
}
- context.update(get_user_information(request))
return templates.TemplateResponse(template, context)
@@ -204,7 +203,7 @@ async def edit_module(request):
template = "modules_edit.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
+
"page": "modules",
"module": config.mercure.modules[module],
"module_name": module,
@@ -212,7 +211,6 @@ async def edit_module(request):
"runtime": runtime,
"support_root_modules": config.mercure.support_root_modules,
}
- context.update(get_user_information(request))
return templates.TemplateResponse(template, context)
diff --git a/webinterface/queue.py b/webinterface/queue.py
index 1f9a5894..6a005e1d 100755
--- a/webinterface/queue.py
+++ b/webinterface/queue.py
@@ -57,12 +57,10 @@ async def show_queues(request):
template = "queue.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
"page": "queue",
"processing_suspended": processing_suspended,
"routing_suspended": routing_suspended,
}
- context.update(get_user_information(request))
return templates.TemplateResponse(template, context)
diff --git a/webinterface/rules.py b/webinterface/rules.py
index e3dea5c1..512916c9 100755
--- a/webinterface/rules.py
+++ b/webinterface/rules.py
@@ -37,7 +37,7 @@
@router.get("/")
@requires("authenticated", redirect="login")
-async def show_rules(request) -> Response:
+async def rules(request) -> Response:
"""Show all defined routing rules. Can be executed by all logged-in users."""
try:
config.read_config()
@@ -47,11 +47,9 @@ async def show_rules(request) -> Response:
template = "rules.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
"page": "rules",
"rules": config.mercure.rules,
}
- context.update(get_user_information(request))
return templates.TemplateResponse(template, context)
@@ -114,10 +112,9 @@ async def rules_edit(request) -> Response:
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
"page": "rules",
"rules": config.mercure.rules,
- "targets": config.mercure.targets,
+ "targets": [t for t in config.mercure.targets if config.mercure.targets[t].direction in ("push", "both")],
"modules": config.mercure.modules,
"rule": rule,
"alltags": tagslist.alltags,
@@ -126,7 +123,6 @@ async def rules_edit(request) -> Response:
"process_runner": config.mercure.process_runner,
"phi_notifications": config.mercure.phi_notifications,
}
- context.update(get_user_information(request))
template = "rules_edit.html"
return templates.TemplateResponse(template, context)
diff --git a/webinterface/targets.py b/webinterface/targets.py
index 20dec540..39b9b980 100755
--- a/webinterface/targets.py
+++ b/webinterface/targets.py
@@ -54,7 +54,6 @@ async def show_targets(request) -> Response:
template = "targets.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
"page": "targets",
"targets": config.mercure.targets,
"used_targets": used_targets,
@@ -108,7 +107,6 @@ async def targets_edit(request) -> Response:
template = "targets_edit.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
"page": "targets",
"targets": config.mercure.targets,
"edittarget": edittarget,
diff --git a/webinterface/templates/base.html b/webinterface/templates/base.html
index da001f69..8c5933a1 100755
--- a/webinterface/templates/base.html
+++ b/webinterface/templates/base.html
@@ -214,6 +214,7 @@
Settings
document.body.addEventListener('htmx:afterRequest', function (evt) {
if (evt.detail.xhr.status != 200) {
+ let message;
if ( evt.detail.xhr.responseText == "Internal Server Error" ) {
message = "Unexpected server error, please check server logs.";
console.error(message);
diff --git a/webinterface/templates/dashboards/dashboards.html b/webinterface/templates/dashboards/dashboards.html
index 054b53e8..2c8db33f 100644
--- a/webinterface/templates/dashboards/dashboards.html
+++ b/webinterface/templates/dashboards/dashboards.html
@@ -6,11 +6,12 @@
Tools
diff --git a/webinterface/templates/dashboards/query.html b/webinterface/templates/dashboards/query.html
new file mode 100644
index 00000000..5ac35b0c
--- /dev/null
+++ b/webinterface/templates/dashboards/query.html
@@ -0,0 +1,449 @@
+{% extends "dashboards/dashboards.html" %}
+
+{% block title %}Query{% endblock %}
+
+{% block extra_head %}
+
+
+
+{% endblock %}
+
+{% block dashboard_content %}
+
+ DICOM Query
+
+ {% if not request.state.redis_connected %}
+
+ Redis connection not available, so this dashboard will not function.
+
+ {% endif %}
+
+
+
+
+
+
+
+ id
+ type
+ status
+ created_at
+ accession
+ result
+ progress
+
+
+
+
+
+
+
+
+
+ Accession
+ Study Description
+ Series Description
+ Instances
+
+
+
+
+
+
+
+
+
+{% endblock %}
\ No newline at end of file
diff --git a/webinterface/templates/dashboards/query_job_fragment.html b/webinterface/templates/dashboards/query_job_fragment.html
new file mode 100644
index 00000000..c4b1ff9b
--- /dev/null
+++ b/webinterface/templates/dashboards/query_job_fragment.html
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+ Accession
+
+
+ Ended at
+
+
+ Status
+
+
+ Progress
+
+
+
+
+ {% for job in subjob_info %}
+
+
+ {{job['accession']}}
+ {{job['ended_at'] or ''}}
+ {{job['status']}}
+ {{job['progress'] or ''}}
+
+ {% endfor %}
+
+
diff --git a/webinterface/templates/index.html b/webinterface/templates/index.html
index 7eda7722..f02a1fa3 100755
--- a/webinterface/templates/index.html
+++ b/webinterface/templates/index.html
@@ -16,10 +16,10 @@
- {{ service_status[service]['name'] }}
-
+ {{ service['name'] }}
+
UP
- {% elif service_status[service]['running'] == False %} class="tag is-danger">DOWN
+ {% elif service['running'] == False %} class="tag is-danger">DOWN
{% else %} class="tag">UNKNOWN
{% endif %}
@@ -66,7 +66,7 @@
{% for service in service_status %}
- 1 %}selected{% endif %} value="{{service}}">{{service_status[service]['name']}}
+ 1 %}selected{% endif %} value="{{service["id"]}}">{{service['name']}}
{% endfor %}
@@ -126,6 +126,10 @@ Service Logs
+ {% if sub_services %}
+
+
+ {% for service in sub_services %}
+
+ {{service }}
+ {% endfor %}
+
+
+ {% endif %}
{{log_content|safe}}
diff --git a/webinterface/templates/rules.html b/webinterface/templates/rules.html
index af02edd8..b7910f5a 100755
--- a/webinterface/templates/rules.html
+++ b/webinterface/templates/rules.html
@@ -87,6 +87,10 @@
Filtering Rules
{% endif %}
{% if (rule.action=='route') or (rule.action=='both') %}
+
+ Target:
+ {{ rule.target }}
+
Routing:
{% if rule.target is string %}
diff --git a/webinterface/templates/targets/base-test.html b/webinterface/templates/targets/base-test.html
index b75fa2e1..c34c8067 100644
--- a/webinterface/templates/targets/base-test.html
+++ b/webinterface/templates/targets/base-test.html
@@ -1,5 +1,5 @@
{% for r in result %}
- {{r}} {{ result[r] if (result[r] not in (True, False)) else ''}}
+ {{r}} {{ result[r] if (result[r] not in (True, False)) else ''}}
{% endfor %}
\ No newline at end of file
diff --git a/webinterface/templates/targets_edit.html b/webinterface/templates/targets_edit.html
index 5b8646af..12b75108 100755
--- a/webinterface/templates/targets_edit.html
+++ b/webinterface/templates/targets_edit.html
@@ -40,9 +40,22 @@ Edit Target - {{edittarget}}
-
{% for t in target_types %}
{%endfor%}
diff --git a/webinterface/users.py b/webinterface/users.py
index 1d70f57f..f7b7276b 100755
--- a/webinterface/users.py
+++ b/webinterface/users.py
@@ -187,8 +187,7 @@ async def show_users(request) -> Response:
return PlainTextResponse("Configuration is being updated. Try again in a minute.")
template = "users.html"
- context = {"request": request, "mercure_version": mercure_defs.VERSION, "page": "users", "users": users_list}
- context.update(get_user_information(request))
+ context = {"request": request, "page": "users", "users": users_list}
return templates.TemplateResponse(template, context)
@@ -237,12 +236,10 @@ async def users_edit(request) -> Response:
template = "users_edit.html"
context = {
"request": request,
- "mercure_version": mercure_defs.VERSION,
"page": "users",
"edituser": edituser,
"edituser_info": users_list[edituser],
}
- context.update(get_user_information(request))
return templates.TemplateResponse(template, context)