Skip to content

Commit

Permalink
Merge branch 'query'
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Wiggins authored and Roy Wiggins committed Oct 2, 2024
2 parents f621add + 6dee9c6 commit 9f8631c
Show file tree
Hide file tree
Showing 54 changed files with 2,787 additions and 543 deletions.
2 changes: 1 addition & 1 deletion build-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ build_component () {

docker build $CACHE -t $PREFIX/mercure-base:$TAG -t $PREFIX/mercure-base:latest -f docker/base/Dockerfile .

for component in ui bookkeeper receiver router processor dispatcher cleaner
for component in ui bookkeeper receiver router processor dispatcher cleaner worker
do
build_component $component
done
Expand Down
1 change: 1 addition & 0 deletions common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"error_folder": "/opt/mercure/data/error",
"discard_folder": "/opt/mercure/data/discard",
"processing_folder": "/opt/mercure/data/processing",
"jobs_folder": "/opt/mercure/data/jobs",
"router_scan_interval": 1, # in seconds
"dispatcher_scan_interval": 1, # in seconds
"cleaner_scan_interval": 60, # in seconds
Expand Down
16 changes: 13 additions & 3 deletions common/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
"""
# Standard python includes
import asyncio
from contextlib import suppress
from datetime import datetime
from datetime import time as _time
import inspect
from pathlib import Path
import threading
from typing import Callable, Optional
from typing import Callable, Optional, Tuple
import graphyte
import aiohttp
import os
import common.influxdb

Expand All @@ -23,6 +21,16 @@

loop = asyncio.get_event_loop()

def validate_folders(config) -> Tuple[bool, str]:
for folder in ( config.incoming_folder, config.studies_folder, config.outgoing_folder,
config.success_folder, config.error_folder, config.discard_folder,
config.processing_folder, config.jobs_folder ):
if not Path(folder).is_dir():
return False, f"Folder {folder} does not exist."
if not os.access( folder, os.R_OK | os.W_OK ):
return False, f"No read/write access to {folder}"
return True, ""

def get_now_str() -> str:
"""Returns the current time as string with mercure-wide formatting"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Expand Down Expand Up @@ -116,6 +124,8 @@ async def _run(self) -> None:

def run_until_complete(self, loop=None) -> None:
self.start()
if not self._task:
raise Exception("Unexpected error: AsyncTimer._task is None")
loop = loop or asyncio.get_event_loop()
loop.run_until_complete(self._task)

Expand Down
8 changes: 4 additions & 4 deletions common/rule_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ def replace_tags(rule: str, tags: Dict[str, str]) -> Any:
safe_eval_cmds = {"float": float, "int": int, "str": str, "len": len, "bool": bool, "sum": sum, "round": round, "max": max, "min": min, "abs": abs, "pow": pow, "chr": chr, "ord": ord}


def eval_rule(rule: str, tags: Dict[str, str]) -> Any:
def eval_rule(rule: str, tags_dict: Dict[str, str]) -> Any:
"""Parses the given rule, replaces all tag variables with values from the given tags dictionary, and
evaluates the rule. If the rule is invalid, an exception will be raised."""
logger.info(f"Rule: {rule}")
rule = replace_tags(rule, tags)
rule = replace_tags(rule, tags_dict)
logger.info(f"Evaluated: {rule}")
tags_obj = Tags(tags)
tags_obj = Tags(tags_dict)
try:
result = eval(rule, {"__builtins__": {}}, {**safe_eval_cmds,"tags":tags_obj})
except SyntaxError as e:
Expand All @@ -63,7 +63,7 @@ def eval_rule(rule: str, tags: Dict[str, str]) -> Any:
if opening >-1 and closing>1:
raise TagNotFoundException(f"No such tag '{rule[opening+1:closing]}' in tags list.")
raise
logger.info(", ".join([f"{tag} = \"{tags[tag]}\"" for tag in tags_obj.tags_accessed()]))
logger.info(", ".join([f"{tag} = \"{tags_dict[tag]}\"" for tag in tags_obj.tags_accessed()]))
logger.info(f"Result: {result}")
return result, tags_obj.tags_accessed()

Expand Down
56 changes: 48 additions & 8 deletions common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ def get(self, item, els=None) -> Any:
class EmptyDict(TypedDict):
pass


class Target(BaseModel, Compat):
target_type: Any
contact: Optional[str] = ""
comment: str = ""
direction: Optional[Literal["pull", "push", "both"]] = "push"

@property
def short_description(self) -> str:
Expand Down Expand Up @@ -120,12 +121,16 @@ def short_description(self) -> str:
class DicomWebTarget(Target):
target_type: Literal["dicomweb"] = "dicomweb"
url: str
qido_url_prefix: Optional[str]
wado_url_prefix: Optional[str]
stow_url_prefix: Optional[str]
access_token: Optional[str]
http_user: Optional[str]
http_password: Optional[str]
qido_url_prefix: Optional[str] = None
wado_url_prefix: Optional[str] = None
stow_url_prefix: Optional[str] = None
access_token: Optional[str] = None
http_user: Optional[str] = None
http_password: Optional[str] = None

@property
def short_description(self) -> str:
return self.url

@property
def short_description(self) -> str:
Expand Down Expand Up @@ -210,8 +215,41 @@ class ProcessingLogsConfig(BaseModel):

class DicomReceiverConfig(BaseModel):
additional_tags: Dict[str,str] = {}



class DicomNodeBase(BaseModel):
name: str

@classmethod
def __get_validators__(cls):
# one or more validators may be yielded which will be called in the
# order to validate the input, each validator will receive as an input
# the value returned from the previous validator
yield cls.validate

@classmethod
def validate(cls, v):
"""Parse the target as any of the known target types."""
subclass_dict: typing.Dict[str, Type[DicomNodeBase]] = {sbc.__name__: sbc for sbc in cls.__subclasses__()}
for k in subclass_dict:
try:
return subclass_dict[k](**v)
except:
pass
raise ValueError("Couldn't validate dicom node as any of", list(subclass_dict.keys()))

@classmethod
def get_name(cls) -> str:
return cls.construct().node_type # type: ignore

class DicomDestination(BaseModel):
name: str
path: str

class DicomRetrieveConfig(BaseModel):
dicom_nodes: List[DicomNodeBase] = []
destination_folders: List[DicomDestination] = []

class Config(BaseModel, Compat):
appliance_name: str
appliance_color: str = "#FFF"
Expand All @@ -224,6 +262,7 @@ class Config(BaseModel, Compat):
error_folder: str
discard_folder: str
processing_folder: str
jobs_folder: str
router_scan_interval: int # in seconds
dispatcher_scan_interval: int # in seconds
cleaner_scan_interval: int # in seconds
Expand Down Expand Up @@ -258,6 +297,7 @@ class Config(BaseModel, Compat):
phi_notifications: Optional[bool] = False
server_time: str = "UTC"
local_time: str = "UTC"
dicom_retrieve: DicomRetrieveConfig = DicomRetrieveConfig()


class TaskInfo(BaseModel, Compat):
Expand Down
1 change: 1 addition & 0 deletions configuration/default_mercure.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"error_folder" : "/opt/mercure/data/error",
"discard_folder" : "/opt/mercure/data/discard",
"processing_folder" : "/opt/mercure/data/processing",
"jobs_folder" : "/opt/mercure/data/jobs",
"bookkeeper" : "0.0.0.0:8080",
"bookkeeper_api_key" : "BOOKKEEPER_TOKEN_PLACEHOLDER",
"graphite_ip" : "",
Expand Down
15 changes: 15 additions & 0 deletions configuration/default_services.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,20 @@
"name": "Bookkeeper",
"systemd_service": "mercure_bookkeeper.service",
"docker_service": "mercure_bookkeeper_1"
},
"workers": {
"name": "Workers",
"systemd_service": ["[email protected]", "[email protected]", "[email protected]", "[email protected]"],
"docker_service": ["mercure_worker_fast_1", "mercure_worker_fast_2", "mercure_worker_slow_1", "mercure_worker_slow_2"]
},
"redis": {
"name": "Redis",
"systemd_service": "mercure_redis.service",
"docker_service": "mercure_redis_1"
},
"ui": {
"name": "UI",
"systemd_service": "mercure_ui.service",
"docker_service": "mercure_ui_1"
}
}
27 changes: 25 additions & 2 deletions dispatch/target_types/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from dataclasses import dataclass

from pydicom import Dataset
from common.types import Task, TaskDispatch, TaskInfo, Rule, Target
import common.config as config
from subprocess import CalledProcessError, check_output
from starlette.responses import JSONResponse
from typing import Any, TypeVar, Generic, cast
from typing import Any, Dict, Generator, List, TypeVar, Generic, cast
from pydicom.datadict import dictionary_VR, keyword_for_tag, tag_for_keyword

from pathlib import Path
import subprocess
Expand All @@ -12,9 +16,15 @@
TargetTypeVar = TypeVar("TargetTypeVar")


@dataclass
class ProgressInfo():
completed: int = 0
remaining: int = 0
progress: str = ""

class TargetHandler(Generic[TargetTypeVar]):
test_template = "targets/base-test.html"

can_pull = False
def __init__(self):
pass

Expand All @@ -32,6 +42,18 @@ def send_to_target(
) -> str:
return ""

class NoSuchTagException(Exception):
pass
def get_from_target(self, target: TargetTypeVar, accession: str, search_filters: Dict[str,List[str]], path:str) -> Generator[ProgressInfo, None, None]:
raise Exception()

def find_from_target(self, target: TargetTypeVar, accession: str, search_filters:Dict[str,List[str]]) -> List[Dataset]:
if self is TargetHandler:
raise Exception("This method should be overridden by a subclass")
for t in search_filters.keys():
if not tag_for_keyword(t):
raise TargetHandler.NoSuchTagException(f"Invalid search filter: no such tag '{t}'")
return []
def handle_error(self, e, command) -> None:
pass

Expand Down Expand Up @@ -73,6 +95,7 @@ def send_to_target(
raise
return result


def handle_error(self, e: CalledProcessError, command) -> None:
logger.error(e.output)
logger.error(f"Failed. Command exited with value {e.returncode}: \n {command}")
23 changes: 22 additions & 1 deletion dispatch/target_types/builtin.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from typing import Any, Dict, Generator, List, Optional

from pydicom import Dataset
from common.types import DicomTarget, DicomTLSTarget, SftpTarget, DummyTarget, Task
import common.config as config
from common.constants import mercure_names
Expand All @@ -10,8 +13,10 @@

from starlette.responses import JSONResponse

from webinterface.dicom_client import DicomClientCouldNotFind, SimpleDicomClient

from .registry import handler_for
from .base import SubprocessTargetHandler, TargetHandler
from .base import ProgressInfo, SubprocessTargetHandler, TargetHandler

DCMSEND_ERROR_CODES = {
1: "EXITCODE_COMMANDLINE_SYNTAX_ERROR",
Expand All @@ -34,6 +39,7 @@ class DicomTargetHandler(SubprocessTargetHandler[DicomTarget]):
test_template = "targets/dicom-test.html"
icon = "fa-database"
display_name = "DICOM"
can_pull = True

def _create_command(self, target: DicomTarget, source_folder: Path, task: Task):
target_ip = target.ip
Expand All @@ -54,6 +60,21 @@ def _create_command(self, target: DicomTarget, source_folder: Path, task: Task):
)
return command, {}

def find_from_target(self, target: DicomTarget, accession: str, search_filters:Dict[str,List[str]]) -> List[Dataset]:
c = SimpleDicomClient(target.ip, target.port, target.aet_target, None)
try:
return c.findscu(accession, search_filters)
except DicomClientCouldNotFind as e:
return []

def get_from_target(self, target: DicomTarget, accession:str, search_filters:Dict[str,List[str]], path) -> Generator[ProgressInfo, None, None]:
config.read_config()
c = SimpleDicomClient(target.ip, target.port, target.aet_target, path)
for identifier in c.getscu(accession, search_filters):
completed, remaining = identifier.NumberOfCompletedSuboperations, identifier.NumberOfRemainingSuboperations,
progress = f"{ completed } / { completed + remaining }"
yield ProgressInfo(completed, remaining, progress)

def handle_error(self, e, command):
dcmsend_error_message = DCMSEND_ERROR_CODES.get(e.returncode, None)
logger.exception(f"Failed command:\n {command} \nbecause of {dcmsend_error_message}")
Expand Down
Loading

0 comments on commit 9f8631c

Please sign in to comment.