Skip to content

Commit

Permalink
initial work on allowing query to force running a rule
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Wiggins authored and Roy Wiggins committed Nov 16, 2024
1 parent 7a44fb4 commit 7dd42e7
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 41 deletions.
Binary file modified bin/ubuntu22.04/getdcmtags
Binary file not shown.
28 changes: 25 additions & 3 deletions getdcmtags/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

#include "tags_list.h"

#define VERSION "getdcmtags Version 0.73"
#define VERSION "getdcmtags Version 0.74"

static OFString tagSpecificCharacterSet = "";
static OFString tagSeriesInstanceUID = "";
Expand All @@ -38,7 +38,7 @@ static std::string bookkeeperToken = "";

static QVector<QPair<DcmTagKey, OFString>> additional_tags;
static QVector<QPair<DcmTagKey, OFString>> main_tags;

static QVector<QPair<OFString, OFString>> force_tags;

// Escape the JSON values properly to avoid problems if DICOM tags contains invalid characters
// (see https://stackoverflow.com/questions/7724448/simple-json-string-escape-for-c)
Expand Down Expand Up @@ -257,6 +257,18 @@ bool writeTagsList(QVector<QPair<DcmTagKey, OFString>>& tags, FILE* fp, OFString
}


bool writeForceTagsList(QVector<QPair<OFString, OFString>>& tags, FILE* fp) {

QVectorIterator<QPair<OFString, OFString>> iter(tags);
while(iter.hasNext())
{
auto pair = iter.next();
fprintf(fp, "\"%s\": \"%s\",\n", pair.first.c_str(), escapeJSONValue(pair.second).c_str());
}
return true;
}


bool writeTagsFile(OFString dcmFile, OFString originalFile)
{
OFString filename = dcmFile + ".tags";
Expand All @@ -281,6 +293,9 @@ bool writeTagsFile(OFString dcmFile, OFString originalFile)

writeTagsList(main_tags, fp, dcmFile, conversionBuffer);
writeTagsList(additional_tags, fp, dcmFile, conversionBuffer);

writeForceTagsList(force_tags, fp);

fprintf(fp, "\"Filename\": \"%s\"\n", originalFile.c_str());
fprintf(fp, "}\n");

Expand Down Expand Up @@ -384,10 +399,17 @@ int main(int argc, char *argv[])
injectErrors = true;
} else if (strcmp(argv[i], "--tags-stop-early") == 0) {
tagsStopEarly = true;
} else if (strcmp(argv[i], "--set-tag") == 0 && i + 1 < argc) {
std::string tag = std::string(argv[++i]);
size_t pos = tag.find('=');
if (pos != std::string::npos) {
auto name = OFString(tag.substr(0, pos).c_str());
auto value = OFString(tag.substr(pos + 1).c_str());
force_tags.append(QPair<OFString, OFString>(name, value));
}
}
}
}

if (injectErrors) {
QFile file("./dcm_inject_error");
if (file.exists() && file.open(QIODevice::ReadOnly | QIODevice::Text)) {
Expand Down
47 changes: 27 additions & 20 deletions routing/route_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,30 +153,37 @@ def get_triggered_rules(
discard_rule = ""
fallback_rule = ""

# Iterate over all defined processing rules
for current_rule in config.mercure.rules:
try:
rule: Rule = config.mercure.rules[current_rule]
if "mercureForceRule" in tagList:
current_rule = tagList["mercureForceRule"]
if current_rule not in config.mercure.rules:
logger.error(f"Invalid force rule {current_rule} for task {task_id}", task_id)
return {}, ""
triggered_rules[tagList["mercureForceRule"]] = True
else:
# Iterate over all defined processing rules
for current_rule in config.mercure.rules:
try:
rule: Rule = config.mercure.rules[current_rule]

# Check if the current rule has been disabled
if rule.disabled == True:
continue
# Check if the current rule has been disabled
if rule.disabled == True:
continue

# If the current rule is flagged as fallback rule, remember the name (to avoid having to iterate over the rules again)
if rule.fallback == True:
fallback_rule = current_rule
# If the current rule is flagged as fallback rule, remember the name (to avoid having to iterate over the rules again)
if rule.fallback == True:
fallback_rule = current_rule

# Check if the current rule is triggered for the provided tag set
if rule_evaluation.parse_rule(rule.get("rule", "False"), tagList)[0]:
triggered_rules[current_rule] = True
if rule.get(mercure_rule.ACTION, "") == mercure_actions.DISCARD:
discard_rule = current_rule
# If the triggered rule's action is to discard, stop further iteration over the rules
break
# Check if the current rule is triggered for the provided tag set
if rule_evaluation.parse_rule(rule.get("rule", "False"), tagList)[0]:
triggered_rules[current_rule] = True
if rule.get(mercure_rule.ACTION, "") == mercure_actions.DISCARD:
discard_rule = current_rule
# If the triggered rule's action is to discard, stop further iteration over the rules
break

except Exception as e:
logger.error(f"Invalid rule found: {current_rule}", task_id) # handle_error
continue
except Exception as e:
logger.error(f"Invalid rule found: {current_rule}", task_id) # handle_error
continue

# If no rule has triggered but a fallback rule exists, then apply this rule
if (len(triggered_rules) == 0) and (fallback_rule):
Expand Down
35 changes: 21 additions & 14 deletions webinterface/dashboards/query/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def query_dummy(job_id, job_kwargs):

yield completed, remaining, f"{completed} / {remaining + completed}"

def invoke_getdcmtags(file: Path, node: Union[DicomTarget, DicomWebTarget]):
def invoke_getdcmtags(file: Path, node: Union[DicomTarget, DicomWebTarget], force_rule: Optional[str] = None):
if isinstance(node, DicomTarget):
sender_address = node.ip
sender_aet = node.aet_target
Expand All @@ -74,12 +74,17 @@ def invoke_getdcmtags(file: Path, node: Union[DicomTarget, DicomWebTarget]):
logger.info(f"Result {result}")
else:
try:
subprocess.check_output([config.app_basepath / "bin" / "getdcmtags", file, sender_address, sender_aet, receiver_aet, config.mercure.bookkeeper, config.mercure.bookkeeper_api_key],)
invoke_with:list = [config.app_basepath / "bin" / "getdcmtags", file, sender_address, sender_aet, receiver_aet, config.mercure.bookkeeper, config.mercure.bookkeeper_api_key]
if force_rule:
invoke_with.extend(["--set-tag", f"mercureForceRule={force_rule}"])
subprocess.check_output(invoke_with)
except subprocess.CalledProcessError as e:
logger.info(e.output.decode())
logger.info(e.stderr.decode())
logger.warning(e.output.decode() if e.output else "No stdout")
logger.warning(e.stderr.decode() if e.stderr else "No stderr")
raise
except:
logger.warning(invoke_with)
raise

@dataclass
class ClassBasedRQTask():
parent: Optional[str] = None
Expand Down Expand Up @@ -116,7 +121,7 @@ def execute(self, *args, **kwargs) -> Any:


@staticmethod
def move_to_destination(path:str, destination: Optional[str], job_id:str, node:Union[DicomTarget, DicomWebTarget]) -> None:
def move_to_destination(path:str, destination: Optional[str], job_id:str, node:Union[DicomTarget, DicomWebTarget], force_rule:Optional[str]=None) -> None:
if destination is None:
config.read_config()
moved_files = []
Expand All @@ -127,9 +132,9 @@ def move_to_destination(path:str, destination: Optional[str], job_id:str, node:U
name = p.stem
else:
name = p.name
logger.warning(f"Moving {p} to {config.mercure.incoming_folder}/{name}")
logger.debug(f"Moving {p} to {config.mercure.incoming_folder}/{name}")
shutil.move(str(p), Path(config.mercure.incoming_folder) / name) # Move the file to incoming folder
invoke_getdcmtags(Path(config.mercure.incoming_folder) / name, node)
invoke_getdcmtags(Path(config.mercure.incoming_folder) / name, node, force_rule)
except:
for file in moved_files:
try:
Expand Down Expand Up @@ -189,7 +194,7 @@ class GetAccessionTask(ClassBasedRQTask):
def get_accession(cls, job_id, accession: str, node: Union[DicomTarget, DicomWebTarget], search_filters: Dict[str, List[str]], path) -> Generator[ProgressInfo, None, None]:
yield from get_handler(node).get_from_target(node, accession, search_filters, path)

def execute(self, *, accession:str, node: Union[DicomTarget, DicomWebTarget], search_filters:Dict[str, List[str]], path: str):
def execute(self, *, accession:str, node: Union[DicomTarget, DicomWebTarget], search_filters:Dict[str, List[str]], path: str, force_rule:Optional[str]=None):
print(f"Getting {accession}")
def error_handler(reason) -> None:
logger.error(reason)
Expand Down Expand Up @@ -236,7 +241,7 @@ def error_handler(reason) -> None:
if job_parent:
if job_parent.kwargs["move_promptly"]:
try:
self.move_to_destination(path, job_parent.kwargs["destination"], job_parent.id, node)
self.move_to_destination(path, job_parent.kwargs["destination"], job_parent.id, node, force_rule)
except Exception as e:
error_handler(f"Failure during move to destination of accession {accession}: {e}")
raise
Expand All @@ -262,7 +267,7 @@ class MainTask(ClassBasedRQTask):
offpeak: bool = False
_queue: str = rq_slow_queue.name

def execute(self, *, accessions, subjobs, path:str, destination: Optional[str], move_promptly: bool, node: Union[DicomTarget, DicomWebTarget]) -> str:
def execute(self, *, accessions, subjobs, path:str, destination: Optional[str], move_promptly: bool, node: Union[DicomTarget, DicomWebTarget], force_rule: Optional[str]=None) -> str:
job = cast(Job,self._job)
job.get_meta()
for job_id in job.kwargs.get('subjobs',[]):
Expand All @@ -279,7 +284,7 @@ def execute(self, *, accessions, subjobs, path:str, destination: Optional[str],
if not p.is_dir():
continue
try:
self.move_to_destination(p, destination, job.id, node)
self.move_to_destination(p, destination, job.id, node, force_rule)
except Exception as e:
err = f"Failure during move to destination {destination}: {e}" if destination else f"Failure during move to {config.mercure.incoming_folder}: {e}"
logger.error(err)
Expand Down Expand Up @@ -307,7 +312,7 @@ def __init__(self, job: Union[Job,str]):
assert self.job.meta.get('type') == 'batch', f"Job type must be batch, got {self.job.meta['type']}"

@classmethod
def create(cls, accessions: List[str], search_filters:Dict[str, List[str]], dicom_node: Union[DicomWebTarget, DicomTarget], destination_path: Optional[str], offpeak:bool=False) -> 'QueryPipeline':
def create(cls, accessions: List[str], search_filters:Dict[str, List[str]], dicom_node: Union[DicomWebTarget, DicomTarget], destination_path: Optional[str], offpeak:bool=False, force_rule:Optional[str]=None) -> 'QueryPipeline':
"""
Create a job to process the given accessions and store them in the specified destination path.
"""
Expand All @@ -319,6 +324,7 @@ def create(cls, accessions: List[str], search_filters:Dict[str, List[str]], dico
get_accession_task = GetAccessionTask(offpeak=offpeak).create_job(
accession=str(accession),
node=dicom_node,
force_rule=force_rule,
search_filters=search_filters,
rq_options=dict(
depends_on=cast(List[Union[Dependency, Job]],[check_job]),
Expand All @@ -337,7 +343,8 @@ def create(cls, accessions: List[str], search_filters:Dict[str, List[str]], dico
destination = destination_path,
node=dicom_node,
move_promptly = True,
rq_options = dict(depends_on=depends, timeout=-1, result_ttl=-1)
rq_options = dict(depends_on=depends, timeout=-1, result_ttl=-1),
force_rule=force_rule
)
check_job.meta["parent"] = main_job.id
for j in get_accession_jobs:
Expand Down
5 changes: 4 additions & 1 deletion webinterface/dashboards/query_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ async def query_post_batch(request):
if search_filter := form.get("study_description"):
search_filters["StudyDescription"] = [x.strip() for x in search_filter.split(",")]

force_rule = form.get("force_rule") or None
try:
QueryPipeline.create(accession.split(","), search_filters, node, destination_path, offpeak=offpeak)
QueryPipeline.create(accession.split(","), search_filters, node, destination_path, offpeak=offpeak, force_rule=force_rule)
except Exception as e:
logger.exception(f"Error creating query pipeline for accession {accession}.")
return JSONErrorResponse(str(e))
Expand Down Expand Up @@ -220,10 +221,12 @@ async def query(request):
template = "dashboards/query.html"
dicom_nodes = [name for name,node in config.mercure.targets.items() if isinstance(node, (DicomTarget, DicomWebTarget)) and node.direction in ("pull", "both")]
destination_folders = [name for name,node in config.mercure.targets.items() if isinstance(node, FolderTarget)]
rules = [name for name, value in config.mercure.rules.items()]
context = {
"request": request,
"destination_folders": destination_folders,
"dicom_nodes": dicom_nodes,
"rules": rules,
"page": "tools",
"tab": "query"
}
Expand Down
34 changes: 31 additions & 3 deletions webinterface/templates/dashboards/query.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ <h1 class="title is-4">DICOM Query</h1>
<div class="field">
<div class="control is-expanded">
<div class="select">
<select name="destination">
<select id="destination-input" name="destination" onchange="">
<option value="">Mercure</option>
<option disabled>————————</option>
{% for destination in destination_folders %}
Expand All @@ -129,7 +129,25 @@ <h1 class="title is-4">DICOM Query</h1>
</div>
</div>
</div>

<div class="field is-horizontal">
<div class="field-label is-normal">
<label class="label">Force Rule</label>
</div>
<div class="field-body">
<div class="field">
<div class="control is-expanded">
<div class="select">
<select id="force-rule-input" name="force_rule">
<option value=""></option>
{% for rule in rules %}
<option>{{ rule }}</option>
{% endfor %}
</select>
</div>
</div>
</div>
</div>
</div>
<div class="field is-horizontal">
<div class="field-label">
<!-- Left empty for spacing -->
Expand Down Expand Up @@ -199,6 +217,15 @@ <h1 class="title is-4">DICOM Query</h1>
$('#backend-error').text('');
$(':input').removeClass("is-danger");
}
function handleForceRuleInput() {
if ($("#destination-input").val() == "") {
$("#force-rule-input").prop('disabled', false);
} else {
$("#force-rule-input").val("");
$("#force-rule-input").prop('disabled', 'disabled');
};
}
$('#destination-input').on('change', handleForceRuleInput);

$('#file-input').on('change', function(event) {
const file = event.target.files[0];
Expand All @@ -219,7 +246,8 @@ <h1 class="title is-4">DICOM Query</h1>


$(document).ready(function () {
// Add tab functionality
handleForceRuleInput();
// Add tab functionality
$('#query_result_tabs a').on('click', function(e) {
e.preventDefault();
// Remove active class from all tabs
Expand Down

0 comments on commit 7dd42e7

Please sign in to comment.