diff --git a/app/bookkeeping/query.py b/app/bookkeeping/query.py index 53eb52b..2ee0d37 100644 --- a/app/bookkeeping/query.py +++ b/app/bookkeeping/query.py @@ -204,9 +204,9 @@ async def find_task(request) -> JSONResponse: study_filter = request.query_params.get("study_filter", "false") filter_term = "" if search_term: - filter_term = (f"""and ((tag_accessionnumber ilike '{search_term}%') """ + - f"""or (tag_patientid ilike '{search_term}%') """ + - f"""or (tag_patientname ilike '%{search_term}%'))""") + filter_term = (f"""and ((tag_accessionnumber ilike '{search_term}%') """ + + f"""or (tag_patientid ilike '{search_term}%') """ + + f"""or (tag_patientname ilike '%{search_term}%'))""") study_filter_term = "" if study_filter == "true": diff --git a/app/common/event_types.py b/app/common/event_types.py index 71b6723..2d8d91b 100644 --- a/app/common/event_types.py +++ b/app/common/event_types.py @@ -69,7 +69,7 @@ class task_event(StringEnum): REMOVE = auto() PROCESS_MODULE_BEGIN = auto() PROCESS_MODULE_COMPLETE = auto() - + class severity(Enum): """Severity level associated to the mercure events.""" diff --git a/app/common/helper.py b/app/common/helper.py index b727fd2..45ae3da 100644 --- a/app/common/helper.py +++ b/app/common/helper.py @@ -33,7 +33,7 @@ def validate_folders(config) -> Tuple[bool, str]: print(f"Created directory: {folder}") except Exception: return False, f"Folder {folder} does not exist." - + if not os.access(folder, os.R_OK | os.W_OK): return False, f"No read/write access to {folder}" return True, "" @@ -212,7 +212,7 @@ class FileLock: """Helper class that implements a file lock. The lock file will be removed also from the destructor so that no spurious lock files remain if exceptions are raised.""" lockCreated = False - + def __init__(self, path_for_lockfile: Path): self.lockfile = path_for_lockfile # TODO: Handle case if lock file cannot be created diff --git a/app/common/notification.py b/app/common/notification.py index 9c556b4..1bb18b1 100644 --- a/app/common/notification.py +++ b/app/common/notification.py @@ -87,7 +87,7 @@ def parse_payload( **context, } return Template(payload_parsed).render(context) - + def send_webhook(url: str, payload: str) -> None: if not url: diff --git a/app/common/types.py b/app/common/types.py index fd0d113..db460de 100644 --- a/app/common/types.py +++ b/app/common/types.py @@ -33,7 +33,7 @@ class Target(BaseModel, Compat): @property def short_description(self) -> str: return "" - + @classmethod def __get_validators__(cls): # one or more validators may be yielded which will be called in the @@ -121,7 +121,7 @@ class RsyncTarget(Target): host: str password: Optional[str] run_on_complete: bool = False - + @property def short_description(self) -> str: return f"{self.host}:{self.folder}" @@ -254,7 +254,7 @@ class DicomDestination(BaseModel): class DicomRetrieveConfig(BaseModel): dicom_nodes: List[DicomNodeBase] = [] destination_folders: List[DicomDestination] = [] - + class Config(BaseModel, Compat): appliance_name: str diff --git a/app/dispatch/dispatcher.py b/app/dispatch/dispatcher.py index d136790..58c624e 100644 --- a/app/dispatch/dispatcher.py +++ b/app/dispatch/dispatcher.py @@ -202,7 +202,7 @@ def main(args=sys.argv[1:]) -> None: config.mercure.influxdb_bucket, "mercure." + appliance_name + ".dispatcher." + instance_name ) - + logger.info(f"Dispatching folder: {config.mercure.outgoing_folder}") dispatcher_lockfile = Path(config.mercure.outgoing_folder + "/" + mercure_names.HALT) diff --git a/app/dispatch/send.py b/app/dispatch/send.py index 5abba70..1a73481 100644 --- a/app/dispatch/send.py +++ b/app/dispatch/send.py @@ -188,7 +188,7 @@ def execute( if len(current_status) != len(dispatch_info.target_name): current_status = {target_item: TaskDispatchStatus(state="waiting", time=get_now_str()) for target_item in dispatch_info.target_name} - + for target_item in dispatch_info.target_name: if current_status[target_item] and current_status[target_item].state != "complete": # type: ignore diff --git a/app/dispatch/target_types/base.py b/app/dispatch/target_types/base.py index 987c271..6a07da9 100644 --- a/app/dispatch/target_types/base.py +++ b/app/dispatch/target_types/base.py @@ -25,7 +25,7 @@ class ProgressInfo(): class TargetHandler(Generic[TargetTypeVar]): test_template = "targets/base-test.html" can_pull = False - + def __init__(self): pass @@ -45,7 +45,7 @@ def send_to_target( class NoSuchTagException(Exception): pass - + def get_from_target(self, target: TargetTypeVar, accession: str, search_filters: Dict[str, List[str]], destination_path: str) -> Generator[ProgressInfo, None, None]: raise Exception() @@ -57,7 +57,7 @@ def find_from_target(self, target: TargetTypeVar, accession: str, search_filters if not tag_for_keyword(t): raise TargetHandler.NoSuchTagException(f"Invalid search filter: no such tag '{t}'") return [] - + def handle_error(self, e, command) -> None: pass diff --git a/app/dispatch/target_types/builtin.py b/app/dispatch/target_types/builtin.py index 469f80c..8f65270 100644 --- a/app/dispatch/target_types/builtin.py +++ b/app/dispatch/target_types/builtin.py @@ -64,7 +64,7 @@ def find_from_target(self, target: DicomTarget, accession: str, search_filters: return c.findscu(accession, search_filters) except DicomClientCouldNotFind: return [] - + def get_from_target(self, target: DicomTarget, accession: str, search_filters: Dict[str, List[str]], destination_path: str) -> Generator[ProgressInfo, None, None]: config.read_config() diff --git a/app/dispatch/target_types/dicomweb.py b/app/dispatch/target_types/dicomweb.py index 5d10003..b460a17 100644 --- a/app/dispatch/target_types/dicomweb.py +++ b/app/dispatch/target_types/dicomweb.py @@ -27,7 +27,7 @@ class DicomWebTargetHandler(TargetHandler[DicomWebTarget]): icon = "fa-share-alt" display_name = "DICOMweb" can_pull = True - + def create_client(self, target: DicomWebTarget) -> Union[DICOMfileClient, DICOMwebClient]: session = None headers = None @@ -39,7 +39,7 @@ def create_client(self, target: DicomWebTarget) -> Union[DICOMfileClient, DICOMw # Todo: store the db elsewhere if we don't have write access to this folder # This also makes it possible to run tests under pyfakefs since it can't patch sqlite3 return DICOMfileClient(url=target.url, in_memory=True, update_db=True) - + if target.http_user and target.http_password: session = create_session_from_user_pass(username=target.http_user, password=target.http_password) elif target.access_token: @@ -71,7 +71,7 @@ def find_from_target(self, target: DicomWebTarget, accession: str, search_filter break else: use_filters.update({k: v[0] for k, v in search_filters.items()}) - + metadata = client.search_for_series(search_filters=use_filters, get_remaining=True, fields=['StudyInstanceUID', 'SeriesInstanceUID', @@ -79,7 +79,7 @@ def find_from_target(self, target: DicomWebTarget, accession: str, search_filter 'StudyDescription', 'SeriesDescription'] + list(search_filters.keys())) meta_datasets = [pydicom.Dataset.from_json(ds) for ds in metadata] result = [] - + # In case the server didn't filter as strictly as we expected it to, filter again for d in meta_datasets: for filter in search_filters: diff --git a/app/process/process_series.py b/app/process/process_series.py index 5c4a9aa..28dbf69 100644 --- a/app/process/process_series.py +++ b/app/process/process_series.py @@ -134,11 +134,11 @@ def decode_task_json(json_string: Optional[str]) -> Any: else: logger.error("No docker tag supplied") return False - + runtime = {} if config.mercure.processing_runtime: runtime = dict(runtime=config.mercure.processing_runtime) - + additional_volumes: Dict[str, Dict[str, str]] = decode_task_json(module.additional_volumes) module_environment = decode_task_json(module.environment) mercure_environment = dict(MERCURE_IN_DIR=container_in_dir, MERCURE_OUT_DIR=container_out_dir) @@ -163,7 +163,7 @@ def decode_task_json(json_string: Optional[str]) -> Any: raise Exception(f"Docker tag {docker_tag} not found, aborting.") from None except (json.decoder.JSONDecodeError, KeyError): raise Exception("Failed to parse MONAI app manifest.") - + module.requires_root = module.requires_root or image_is_monai_map # Merge the two dictionaries @@ -267,7 +267,7 @@ def decode_task_json(json_string: Optional[str]) -> Any: logs = helper.localize_log_timestamps(logs, config) if not config.mercure.processing_logs.discard_logs: monitor.send_process_logs(task.id, task_processing.module_name, logs) - + logger.info(logs) logger.info("=== MODULE OUTPUT - END ==========================================") @@ -277,8 +277,8 @@ def decode_task_json(json_string: Optional[str]) -> Any: if (datetime.now() - docker_pull_throttle.get("busybox:stable-musl", datetime.fromisocalendar(1, 1, 1)) ).total_seconds() > 86400: # noqa: 125 - docker_client.images.pull("busybox:stable-musl") # noqa: E117 - docker_pull_throttle["busybox:stable_musl"] = datetime.now() + docker_client.images.pull("busybox:stable-musl") # noqa: E117 + docker_pull_throttle["busybox:stable_musl"] = datetime.now() except Exception: logger.exception("could not pull busybox") @@ -499,7 +499,7 @@ async def process_series(folder: Path) -> None: monitor.send_task_event( monitor.task_event.PROCESS_COMPLETE, task_id, file_count_complete, "", "Processing job complete" ) - + # If dispatching not needed, then trigger the completion notification (for docker/systemd) if not needs_dispatching: monitor.send_task_event(monitor.task_event.COMPLETE, task_id, 0, "", "Task complete") @@ -530,7 +530,7 @@ async def process_series(folder: Path) -> None: trigger_notification(task, mercure_events.ERROR) return - + def push_input_task(input_folder: Path, output_folder: Path): task_json = output_folder / "task.json" if not task_json.exists(): diff --git a/app/routing/route_series.py b/app/routing/route_series.py index de24553..c729f31 100644 --- a/app/routing/route_series.py +++ b/app/routing/route_series.py @@ -366,7 +366,7 @@ def push_serieslevel_routing( targets.append(rule_definition.get("target")) else: targets = rule_definition.get("target") - + for target in targets: if not selected_targets.get(target): selected_targets[target] = [current_rule] @@ -622,7 +622,7 @@ def remove_series(task_id: str, file_list: List[str], series_UID: str) -> bool: except Exception: logger.error(f"Error while removing file {entry}", task_id) # handle_error return False - + monitor.send_task_event(monitor.task_event.REMOVE, task_id, len(file_list), "", "Removed duplicate files") return True diff --git a/app/webinterface/dashboards/query/jobs.py b/app/webinterface/dashboards/query/jobs.py index 9b8791b..d60257e 100644 --- a/app/webinterface/dashboards/query/jobs.py +++ b/app/webinterface/dashboards/query/jobs.py @@ -136,7 +136,7 @@ def move_to_destination(path: str, destination: Optional[str], job_id: str, (dest_folder / ".complete").touch() lock.free() return - + config.read_config() moved_files = [] try: @@ -167,7 +167,7 @@ def move_to_destination(path: str, destination: Optional[str], job_id: str, class CheckAccessionsTask(ClassBasedRQTask): type: str = "check_accessions" _queue: str = rq_fast_queue.name - + def execute(self, *, accessions: List[str], node: Union[DicomTarget, DicomWebTarget], search_filters: Dict[str, List[str]] = {}): """ @@ -211,7 +211,7 @@ def get_accession(cls, job_id, accession: str, node: Union[DicomTarget, DicomWeb def execute(self, *, accession: str, node: Union[DicomTarget, DicomWebTarget], search_filters: Dict[str, List[str]], path: str, force_rule: Optional[str] = None): logger.info(f"Getting ACC {accession}") - + def error_handler(reason) -> None: logger.error(reason) if not job_parent: @@ -262,13 +262,13 @@ def error_handler(reason) -> None: except Exception as e: error_handler(f"Failure during move to destination of accession {accession}: {e}") raise - + job_parent.get_meta() # there is technically a race condition here... job_parent.meta['completed'] += 1 job_parent.meta['progress'] = (f"{job_parent.meta['started'] } /" f" {job_parent.meta['completed'] } / {job_parent.meta['total']}") job_parent.save_meta() # type: ignore - + except Exception as e: error_handler(f"Failure with accession {accession}: {e}") raise @@ -325,7 +325,7 @@ def execute(self, *, accessions, subjobs, path: str, destination: Optional[str], class QueryPipeline(): job: Job connection: Redis - + def __init__(self, job: Union[Job, str], connection: Redis = redis): self.connection = connection if isinstance(job, str): @@ -482,11 +482,11 @@ def get_status(self) -> JobStatus: def get_meta(self) -> Any: return cast(dict, self.job.get_meta()) - + @property def meta(self) -> typing.Dict: return cast(dict, self.job.meta) - + @property def is_failed(self) -> bool: return cast(bool, self.job.is_failed) @@ -494,7 +494,7 @@ def is_failed(self) -> bool: @property def is_finished(self) -> bool: return cast(bool, self.job.is_finished) - + @property def is_paused(self) -> bool: return cast(bool, self.meta.get("paused", False)) @@ -510,15 +510,15 @@ def kwargs(self) -> typing.Dict: except rq.exceptions.DeserializationError: logger.info(f"Failed to deserialize job kwargs: {self.job.data}") raise - + @property def result(self) -> Any: return self.job.result - + @property def created_at(self) -> datetime: return cast(datetime, self.job.created_at) - + @property def enqueued_at(self) -> datetime: return cast(datetime, self.job.enqueued_at) @@ -546,5 +546,5 @@ def get_all(cls, type: str = "batch", connection: Redis = redis) -> Generator['Q job_ids.add(j_id) jobs = (Job.fetch(j_id, connection) for j_id in job_ids) - + return (QueryPipeline(j, connection) for j in jobs if j and j.get_meta().get("type") == type) diff --git a/app/webinterface/dashboards/query_routes.py b/app/webinterface/dashboards/query_routes.py index 237ea1f..73b97a6 100644 --- a/app/webinterface/dashboards/query_routes.py +++ b/app/webinterface/dashboards/query_routes.py @@ -18,16 +18,16 @@ from .query.jobs import CheckAccessionsTask, QueryPipeline logger = config.get_logger() - + @router.post("/query/retry_job") @requires(["authenticated", "admin"], redirect="login") async def post_retry_job(request): job = QueryPipeline(request.query_params['id']) - + if not job: return JSONErrorResponse(f"Job with id {request.query_params['id']} not found.", status_code=404) - + try: job.retry() except Exception: @@ -78,7 +78,7 @@ async def get_job_info(request): job = QueryPipeline(job_id) if not job: return JSONErrorResponse('Job not found', status_code=404) - + subjob_info: List[Dict[str, Any]] = [] for subjob in job.get_subjobs(): if not subjob: @@ -197,7 +197,7 @@ async def query_jobs(request): task_dict["progress"] = f"{n_total} / {n_total}" elif task_dict["status"] in ("deferred", "started", "paused", "canceled"): task_dict["progress"] = f"{n_completed} / {n_total}" - + # if task_dict["status"] == "canceled" and if task.meta.get('paused', False) and task_dict["status"] not in ("finished", "failed"): if n_started < n_completed: # TODO: this does not work @@ -266,7 +266,7 @@ async def check_accessions(request): node_name = form.get("dicom_node") accessions = form.get("accessions", "").split(",") - + search_filters = {} if search_filter := form.get("series_description"): search_filters["SeriesDescription"] = [x.strip() for x in search_filter.split(",")] @@ -276,7 +276,7 @@ async def check_accessions(request): node = config.mercure.targets.get(node_name) if not isinstance(node, (DicomWebTarget, DicomTarget)): return JSONErrorResponse(f"Invalid DICOM node '{node_name}'.", status_code=400) - + try: job = CheckAccessionsTask().create_job(connection=redis, accessions=accessions, node=node, search_filters=search_filters) diff --git a/app/webinterface/dicom_client.py b/app/webinterface/dicom_client.py index dabcd29..716eeb0 100644 --- a/app/webinterface/dicom_client.py +++ b/app/webinterface/dicom_client.py @@ -67,7 +67,7 @@ def __init__(self, host, port, called_aet, calling_aet, out_dir) -> None: self.calling_aet = calling_aet or "MERCURE" self.called_aet = called_aet self.output_dir = out_dir - + def handle_store(self, event): try: ds = event.dataset @@ -178,7 +178,7 @@ def getscu(self, accession_number: str, search_filters: Dict[str, List[str]]) -> # `rsp_identifier` is the C-GET response's Identifier dataset if not status: raise DicomClientBadStatus() - + if status.Status in [0xFF00, 0xFF01]: yield status success = True diff --git a/app/webinterface/modules.py b/app/webinterface/modules.py index b90ef40..adec313 100644 --- a/app/webinterface/modules.py +++ b/app/webinterface/modules.py @@ -99,7 +99,7 @@ async def show_modules(request): template = "modules.html" context = { "request": request, - + "page": "modules", "modules": config.mercure.modules, "used_modules": used_modules, @@ -203,7 +203,7 @@ async def edit_module(request): template = "modules_edit.html" context = { "request": request, - + "page": "modules", "module": config.mercure.modules[module], "module_name": module,