Skip to content

Commit

Permalink
fix for query: send dicoms back to Mercure
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Wiggins authored and Roy Wiggins committed Nov 15, 2024
1 parent 1090bb3 commit 7a44fb4
Show file tree
Hide file tree
Showing 14 changed files with 202 additions and 51 deletions.
6 changes: 6 additions & 0 deletions common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
else:
configuration_filename = (os.getenv("MERCURE_CONFIG_FOLDER") or "/opt/mercure/config") + "/mercure.json"

_os_mercure_basepath = os.getenv("MERCURE_BASEPATH")
if _os_mercure_basepath is None:
app_basepath = Path(__file__).resolve().parent.parent
else:
app_basepath = Path(_os_mercure_basepath)

mercure_defaults = {
"appliance_name": "master",
"appliance_color": "#FFF",
Expand Down
27 changes: 14 additions & 13 deletions common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ class DicomTLSTarget(Target):
def short_description(self) -> str:
return f"{self.ip}:{self.port}"

class DicomWebTarget(Target):
target_type: Literal["dicomweb"] = "dicomweb"
url: str
qido_url_prefix: Optional[str] = None
wado_url_prefix: Optional[str] = None
stow_url_prefix: Optional[str] = None
access_token: Optional[str] = None
http_user: Optional[str] = None
http_password: Optional[str] = None

@property
def short_description(self) -> str:
return self.url

class SftpTarget(Target):
target_type: Literal["sftp"] = "sftp"
folder: str
Expand Down Expand Up @@ -118,19 +132,6 @@ def short_description(self) -> str:
return self.host


class DicomWebTarget(Target):
target_type: Literal["dicomweb"] = "dicomweb"
url: str
qido_url_prefix: Optional[str] = None
wado_url_prefix: Optional[str] = None
stow_url_prefix: Optional[str] = None
access_token: Optional[str] = None
http_user: Optional[str] = None
http_password: Optional[str] = None

@property
def short_description(self) -> str:
return self.url


class S3Target(Target):
Expand Down
3 changes: 2 additions & 1 deletion dispatch/target_types/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def send_to_target(

class NoSuchTagException(Exception):
pass
def get_from_target(self, target: TargetTypeVar, accession: str, search_filters: Dict[str,List[str]], path:str) -> Generator[ProgressInfo, None, None]:

def get_from_target(self, target: TargetTypeVar, accession: str, search_filters: Dict[str,List[str]], destination_path:str) -> Generator[ProgressInfo, None, None]:
raise Exception()

def find_from_target(self, target: TargetTypeVar, accession: str, search_filters:Dict[str,List[str]]) -> List[Dataset]:
Expand Down
4 changes: 2 additions & 2 deletions dispatch/target_types/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ def find_from_target(self, target: DicomTarget, accession: str, search_filters:
except DicomClientCouldNotFind as e:
return []

def get_from_target(self, target: DicomTarget, accession: str, search_filters:Dict[str,List[str]], path) -> Generator[ProgressInfo, None, None]:
def get_from_target(self, target: DicomTarget, accession: str, search_filters:Dict[str,List[str]], destination_path: str) -> Generator[ProgressInfo, None, None]:
config.read_config()
c = SimpleDicomClient(target.ip, target.port, target.aet_target, target.aet_source, path)
c = SimpleDicomClient(target.ip, target.port, target.aet_target, target.aet_source, destination_path)
for identifier in c.getscu(accession, search_filters):
completed, remaining = identifier.NumberOfCompletedSuboperations, identifier.NumberOfRemainingSuboperations,
progress = f"{ completed } / { completed + remaining }"
Expand Down
4 changes: 2 additions & 2 deletions dispatch/target_types/dicomweb.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def find_from_target(self, target: DicomWebTarget, accession: str, search_filter
logger.debug(result)
return result

def get_from_target(self, target: DicomWebTarget, accession, search_filters, path) -> Generator[ProgressInfo, None, None]:
def get_from_target(self, target: DicomWebTarget, accession, search_filters, destination_path:str) -> Generator[ProgressInfo, None, None]:
series = self.find_from_target(target, accession, search_filters=search_filters)
if not series:
raise ValueError("No series found with accession number {}".format(accession))
Expand All @@ -97,7 +97,7 @@ def get_from_target(self, target: DicomWebTarget, accession, search_filters, pat
# remaining += len(instances)
for instance in instances:
sop_instance_uid = instance.get('SOPInstanceUID')
filename = f"{path}/{sop_instance_uid}.dcm"
filename = f"{destination_path}/{sop_instance_uid}.dcm"
instance.save_as(filename)
n += 1
remaining -= 1
Expand Down
2 changes: 2 additions & 0 deletions docker/base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ RUN chown -R mercure ./configuration && \
# This fixes the issue that every code update rebuilds Conda
COPY ./ .

RUN ln -s /opt/mercure/app/bin/ubuntu22.04/getdcmtags /opt/mercure/app/bin/getdcmtags

# The configuration and data folders is probably the only thing that mercure ACTUALLY needs to write to
RUN chmod -R o+rx /opt/mercure/app && \
chown -R mercure /opt/mercure/config && \
Expand Down
13 changes: 10 additions & 3 deletions install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,20 @@ start_docker () {
sudo docker-compose up -d
}

link_binaries() {
sudo find "$MERCURE_BASE/app/bin/ubuntu$UBUNTU_VERSION" -type f -exec ln -f -s {} "$MERCURE_BASE/app/bin" \;
}

install_app_files() {
local overwrite=${1:-false}

if [ "$overwrite" = true ] || [ ! -e "$MERCURE_BASE"/app ]; then
echo "## Installing app files..."
sudo mkdir "$MERCURE_BASE"/app
sudo cp -R "$MERCURE_SRC" "$MERCURE_BASE"/app
[ "$overwrite" = true ] || sudo mkdir "$MERCURE_BASE"/app
if [ ! "$MERCURE_SRC" -ef "$MERCURE_BASE"/app ]; then
sudo cp -R "$MERCURE_SRC" "$MERCURE_BASE"/app
fi
link_binaries
sudo chown -R $OWNER:$OWNER "$MERCURE_BASE/app"
fi
}
Expand Down Expand Up @@ -460,7 +467,7 @@ systemd_update () {
fi
done
create_folders
install_app_files
install_app_files true
sudo cp -n "$MERCURE_SRC"/installation/sudoers/* /etc/sudoers.d/
install_packages
install_dependencies
Expand Down
43 changes: 33 additions & 10 deletions receiver.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,41 @@
echo "mercure DICOM receiver"
echo "----------------------"
echo ""
echo "Arguments: $@"
if [ $# -eq 0 ]; then
echo "No arguments provided."
else
echo "Arguments: $@"
fi
echo ""
binary=bin/getdcmtags
if [[ $(lsb_release -rs) == "24.04" ]]; then
binary=bin/ubuntu24.04/getdcmtags
elif [[ $(lsb_release -rs) == "22.04" ]]; then
binary=bin/ubuntu22.04/getdcmtags
elif [[ $(lsb_release -rs) == "20.04" ]]; then
binary=bin/ubuntu20.04/getdcmtags
elif [[ $(lsb_release -rs) == "18.04" ]]; then
binary=bin/ubuntu18.04/getdcmtags
fi

if [[ ! -f "$binary" ]] ; then
if [[ $(lsb_release -rs) == "24.04" ]]; then
binary=bin/ubuntu24.04/getdcmtags
elif [[ $(lsb_release -rs) == "22.04" ]]; then
binary=bin/ubuntu22.04/getdcmtags
elif [[ $(lsb_release -rs) == "20.04" ]]; then
binary=bin/ubuntu20.04/getdcmtags
elif [[ $(lsb_release -rs) == "18.04" ]]; then
binary=bin/ubuntu18.04/getdcmtags
fi
fi
if [[ -f "$binary" ]] ; then
echo "getdcmtags binary at '$binary'"
else
echo "ERROR: Unable to locate getdcmtags binary at '$binary'"
echo "Terminating..."
exit 1
fi

if $binary -h &> /dev/null; then
echo "getdcmtags binary validated."
else
echo "ERROR: getdcmtags binary failed to start."
echo "Terminating..."
exit 1
fi
# Check if the configuration is accessible

config_folder="${MERCURE_CONFIG_FOLDER:-/opt/mercure/config}"
config="${config_folder}/mercure.json"
Expand Down
21 changes: 21 additions & 0 deletions tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import tempfile
from typing import Dict, Optional, Tuple
import pydicom
import pyfakefs
import pytest
from pynetdicom import AE, evt, StoragePresentationContexts, build_role
from pynetdicom.sop_class import Verification, StudyRootQueryRetrieveInformationModelFind, StudyRootQueryRetrieveInformationModelGet,PatientRootQueryRetrieveInformationModelGet, CTImageStorage # type: ignore
Expand Down Expand Up @@ -209,6 +210,26 @@ def test_query_job(dicom_server, tempdir, rq_connection,fs):
raise Exception(f"No DICOM file found in {tempdir}")
assert pydicom.dcmread(example_dcm).AccessionNumber == MOCK_ACCESSIONS[0]

def test_query_job_to_mercure(dicom_server, tempdir, rq_connection, fs, mercure_config):
"""
Test the create_job function.
We use mocker to mock the queue and avoid actually creating jobs.
"""
config = mercure_config()
job = QueryPipeline.create([MOCK_ACCESSIONS[0]], {}, dicom_server, None, False)
w = SimpleWorker(["mercure_fast", "mercure_slow"], connection=rq_connection)

w.work(burst=True)
# print(list(Path(config.incoming_folder).iterdir()))
# assert len(list(Path(config.incoming_folder).iterdir())) == 1
print([k for k in Path(config.incoming_folder).rglob('*')])
try:
example_dcm = next(k for k in Path(config.incoming_folder).rglob("*.tags"))
except StopIteration:
raise Exception(f"No tags file found in {config.incoming_folder}")
# assert pydicom.dcmread(example_dcm).AccessionNumber == MOCK_ACCESSIONS[0]
# assert example_dcm.with_suffix('.tags').exists()

def tree(path, prefix='', level=0) -> None:
if level==0:
print(path)
Expand Down
2 changes: 1 addition & 1 deletion tests/testing_integration_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def create_config(self, services) -> None:
startsecs={service.startsecs}
stopasgroup={str(service.stopasgroup).lower()}
numprocs={service.numprocs}
environment=MERCURE_CONFIG_FOLDER="{self.mercure_base}/config"
environment=MERCURE_CONFIG_FOLDER="{self.mercure_base}/config",MERCURE_BASEPATH="{self.mercure_base}"
""")

def run(self) -> None:
Expand Down
70 changes: 60 additions & 10 deletions webinterface/dashboards/query/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
import os
from pathlib import Path
import shutil

import subprocess
from typing import Any, Dict, Generator, List, Optional, Tuple, Type, Union, cast
import typing

import pyfakefs.fake_pathlib
import rq


from common import helper
from common.types import DicomTarget, DicomWebTarget, FolderTarget
Expand All @@ -23,6 +28,7 @@
from webinterface.common import redis, rq_fast_queue, rq_slow_queue
from rq.job import Dependency, JobStatus, Job
from rq import Connection, Queue, get_current_job
from tests.getdcmtags import process_dicom

logger = config.get_logger()

Expand All @@ -49,6 +55,30 @@ def query_dummy(job_id, job_kwargs):

yield completed, remaining, f"{completed} / {remaining + completed}"

def invoke_getdcmtags(file: Path, node: Union[DicomTarget, DicomWebTarget]):
if isinstance(node, DicomTarget):
sender_address = node.ip
sender_aet = node.aet_target
receiver_aet = node.aet_source
elif isinstance(node, DicomWebTarget):
sender_address = node.url
sender_aet = "MERCURE-QUERY"
receiver_aet = "MERCURE"

is_fake_fs = isinstance(Path, pyfakefs.fake_pathlib.FakePathlibPathModule)
if is_fake_fs: # running a test
result = process_dicom(file, sender_address, sender_aet, receiver_aet) # don't bother with bookkeeper
if result is None:
raise Exception("Failed to get DICOM tags from the file.")
else:
logger.info(f"Result {result}")
else:
try:
subprocess.check_output([config.app_basepath / "bin" / "getdcmtags", file, sender_address, sender_aet, receiver_aet, config.mercure.bookkeeper, config.mercure.bookkeeper_api_key],)
except subprocess.CalledProcessError as e:
logger.info(e.output.decode())
logger.info(e.stderr.decode())
raise

@dataclass
class ClassBasedRQTask():
Expand Down Expand Up @@ -84,13 +114,29 @@ def _execute(cls, **kwargs) -> Any:
def execute(self, *args, **kwargs) -> Any:
pass


@staticmethod
def move_to_destination(path, destination, job_id) -> None:
def move_to_destination(path:str, destination: Optional[str], job_id:str, node:Union[DicomTarget, DicomWebTarget]) -> None:
if destination is None:
config.read_config()
for p in Path(path).glob("**/*"):
if p.is_file():
shutil.move(str(p), config.mercure.incoming_folder) # Move the file to incoming folder
moved_files = []
try:
for p in Path(path).glob("**/*"):
if p.is_file():
if p.suffix == ".dcm":
name = p.stem
else:
name = p.name
logger.warning(f"Moving {p} to {config.mercure.incoming_folder}/{name}")
shutil.move(str(p), Path(config.mercure.incoming_folder) / name) # Move the file to incoming folder
invoke_getdcmtags(Path(config.mercure.incoming_folder) / name, node)
except:
for file in moved_files:
try:
file.unlink()
except:
pass
raise
# tree(config.mercure.incoming_folder)
shutil.rmtree(path)
else:
Expand Down Expand Up @@ -190,7 +236,7 @@ def error_handler(reason) -> None:
if job_parent:
if job_parent.kwargs["move_promptly"]:
try:
self.move_to_destination(path, job_parent.kwargs["destination"], job_parent.id)
self.move_to_destination(path, job_parent.kwargs["destination"], job_parent.id, node)
except Exception as e:
error_handler(f"Failure during move to destination of accession {accession}: {e}")
raise
Expand All @@ -216,7 +262,7 @@ class MainTask(ClassBasedRQTask):
offpeak: bool = False
_queue: str = rq_slow_queue.name

def execute(self, *, accessions, subjobs, path, destination, move_promptly) -> str:
def execute(self, *, accessions, subjobs, path:str, destination: Optional[str], move_promptly: bool, node: Union[DicomTarget, DicomWebTarget]) -> str:
job = cast(Job,self._job)
job.get_meta()
for job_id in job.kwargs.get('subjobs',[]):
Expand All @@ -233,14 +279,13 @@ def execute(self, *, accessions, subjobs, path, destination, move_promptly) -> s
if not p.is_dir():
continue
try:
self.move_to_destination(p, destination, job.id)
self.move_to_destination(p, destination, job.id, node)
except Exception as e:
err = f"Failure during move to destination {destination}: {e}" if destination else f"Failure during move to {config.mercure.incoming_folder}: {e}"
logger.error(err)
job.meta["failed_reason"] = err
job.save_meta() # type: ignore
raise
# subprocess.run(["./bin/ubuntu22.04/getdcmtags", filename, self.called_aet, "MERCURE"],check=True)

logger.info(f"Removing job directory {path}")
shutil.rmtree(path)
Expand All @@ -262,7 +307,7 @@ def __init__(self, job: Union[Job,str]):
assert self.job.meta.get('type') == 'batch', f"Job type must be batch, got {self.job.meta['type']}"

@classmethod
def create(cls, accessions: List[str], search_filters:Dict[str, List[str]], dicom_node: Union[DicomWebTarget, DicomTarget], destination_path, offpeak=False) -> 'QueryPipeline':
def create(cls, accessions: List[str], search_filters:Dict[str, List[str]], dicom_node: Union[DicomWebTarget, DicomTarget], destination_path: Optional[str], offpeak:bool=False) -> 'QueryPipeline':
"""
Create a job to process the given accessions and store them in the specified destination path.
"""
Expand Down Expand Up @@ -290,6 +335,7 @@ def create(cls, accessions: List[str], search_filters:Dict[str, List[str]], dico
accessions = accessions,
subjobs = [check_job.id]+[j.id for j in get_accession_jobs],
destination = destination_path,
node=dicom_node,
move_promptly = True,
rq_options = dict(depends_on=depends, timeout=-1, result_ttl=-1)
)
Expand Down Expand Up @@ -418,7 +464,11 @@ def id(self) -> str:

@property
def kwargs(self) -> typing.Dict:
return cast(dict,self.job.kwargs)
try:
return cast(dict,self.job.kwargs)
except rq.exceptions.DeserializationError:
logger.info(f"Failed to deserialize job kwargs: {self.job.data}")
raise

@property
def result(self) -> Any:
Expand Down
Loading

0 comments on commit 7a44fb4

Please sign in to comment.