Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rcq updates9 #473

Merged
merged 2 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions dax/processors_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,7 @@ def is_first_mr_session(self, session, project_data):
def _map_inputs(self, session, project_data):
inputs = self.proc_inputs
artefacts_by_input = {k: [] for k in inputs}
artefact_ids_by_input = {k: [] for k in inputs}

# Get lists for scans/assrs for this session
LOGGER.debug('prepping session data')
Expand Down Expand Up @@ -1468,8 +1469,39 @@ def _map_inputs(self, session, project_data):
# Break if the scan matches so we don't find it again comparing
# vs a different requested type
artefacts_by_input[i].append(cscan['full_path'])
artefact_ids_by_input[i].append(scanid)
break

# If requested, check for multiple matching scans in the list and only keep
# the first. Sort lowercase by alpha, on scan ID.
if len(artefacts_by_input[i]) > 0 and iv['keep_multis'] != 'all':
scan_info = zip(
artefacts_by_input[i],
artefact_ids_by_input[i],
)
sorted_info = sorted(scan_info, key=lambda x: str(x[1]).lower())
num_scans = sum(1 for _ in sorted_info)
if iv['keep_multis'] == 'first':
idx_multi = 1
elif iv['keep_multis'] == 'last':
idx_multi = num_scans
else:
try:
idx_multi = int(iv['keep_multis'])
except:
msg = f'For {i}, keep_multis must be first, last, or index 1,2,3,...'
LOGGER.error(msg)
raise AutoProcessorError(msg)
if idx_multi > num_scans:
msg = f'Requested {idx_multi}th scan for {i}, but only {num_scans} found'
LOGGER.error(msg)
raise AutoProcessorError(msg)
artefacts_by_input[i] = [sorted_info[idx_multi-1][0]]
LOGGER.info(
f'Keeping only the {idx_multi}th scan found for '
f'{i}: {sorted_info[idx_multi-1][0]}'
)

elif iv['artefact_type'] == 'assessor':
for cassr in assrs:
proctype = cassr.get('PROCTYPE')
Expand Down
69 changes: 51 additions & 18 deletions dax/rcq/analysislauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,22 +359,52 @@ def get_session_inputs(self, spec, info, subject, session):
for scan_spec in spec.get('scans', []):
logger.debug(f'scan_spec={scan_spec}')

# Get list of resources to download from this scan
resources = scan_spec.get('resources', [])

# Check for nifti tag
if 'nifti' in scan_spec:
# Add a NIFTI resource using value as fdest
resources.append({
'resource': 'NIFTI',
'fdest': scan_spec['nifti']
})

scan_types = scan_spec['types'].split(',')

logger.debug(f'scan_types={scan_types}')

for scan in [x for x in scans if x['SCANTYPE'] in scan_types]:
input_scans = [x for x in scans if x['SCANTYPE'] in scan_types]

# Get list of resources to download from this scan
resources = scan_spec.get('resources', [])
if len(input_scans) > 0 and scan_spec.get('keep_multis', '') != 'all':
# Sort by id
input_scans = sorted(input_scans, lambda: x: x['SCANID'])
num_scans = len(input_scans)

# Check for nifti tag
if 'nifti' in scan_spec:
# Add a NIFTI resource using value as fdest
resources.append({
'resource': 'NIFTI',
'fdest': scan_spec['nifti']
})
# Apply keep_multis filter
if scan_spec['keep_multis'] == 'first':
idx_multi = 1
elif iv['keep_multis'] == 'last':
idx_multi = num_scans
else:
try:
idx_multi = int(iv['keep_multis'])
except:
msg = 'keep_multis must be first, last, or index 1,2,3,...'
logger.error(msg)
raise Exception(msg)

if idx_multi > num_scans:
msg = f'{idx_multi} index exceeds found {num_scans}'
logger.error(msg)
raise Exception(msg)

# Get a list of only the requested scan
input_scans = [input_scans[idx_multi-1]]

# Get the file inputs for each input scan
for scan in input_scans:
scanid = scan['SCANID']

for res_spec in resources:
try:
Expand All @@ -384,7 +414,7 @@ def get_session_inputs(self, spec, info, subject, session):
continue

# Get the download destination subdir
ddest = f'{subject}/{session}'
ddest = f'{subject}/{session}/scans/{scanid}'
if res_spec.get('ddest', False):
ddest += '/' + res_spec.get('ddest')

Expand All @@ -396,7 +426,7 @@ def get_session_inputs(self, spec, info, subject, session):
scan['SCANID'],
res
)
fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/scans/{scan["SCANID"]}/resources/{res}/files/{_file}'
fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/scans/{scanid}/resources/{res}/files/{_file}'
inputs.append(self._input(
fpath,
'FILE',
Expand Down Expand Up @@ -438,11 +468,12 @@ def get_session_inputs(self, spec, info, subject, session):
logger.debug(f'assr_types={assr_types}')

for assr in [x for x in assessors if x['PROCTYPE'] in assr_types]:
assrlabel = assr["ASSR"]

for res_spec in assr_spec['resources']:

# Get the download destination subdir
ddest = f'{subject}/{session}'
ddest = f'{subject}/{session}/assessors/{assrlabel}'
if res_spec.get('ddest', False):
ddest += '/' + res_spec.get('ddest')

Expand All @@ -454,15 +485,15 @@ def get_session_inputs(self, spec, info, subject, session):

if 'fmatch' in res_spec:
for fmatch in res_spec['fmatch'].split(','):
fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/assessors/{assr["ASSR"]}/out/resources/{res}/files/{fmatch}'
fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/assessors/{assrlabel}/out/resources/{res}/files/{fmatch}'
inputs.append(self._input(
fpath,
'FILE',
res_spec.get('fdest', fmatch),
ddest))
else:
# whole resource
fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/assessors/{assr["ASSR"]}/out/resources/{res}/files'
fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/assessors/{assrlabel}/out/resources/{res}/files'
inputs.append(self._input(
fpath,
'DIR',
Expand All @@ -481,6 +512,8 @@ def get_subject_inputs(self, spec, info, subject):
sgp = [x for x in info['sgp'] if x['SUBJECT'] == subject]

for assr in sgp:
assrlabel = assr['ASSR']

for assr_spec in sgp_spec:
logger.debug(f'assr_spec={assr_spec}')
assr_types = assr_spec['types'].split(',')
Expand All @@ -494,7 +527,7 @@ def get_subject_inputs(self, spec, info, subject):
for res_spec in assr_spec['resources']:

# Get the download destination subdir
ddest = f'{subject}'
ddest = f'{subject}/assessors/{assrlabel}'
if res_spec.get('ddest', False):
ddest += '/' + res_spec.get('ddest')

Expand All @@ -508,7 +541,7 @@ def get_subject_inputs(self, spec, info, subject):
if 'fmatch' in res_spec:
# Add each file
for fmatch in res_spec['fmatch'].split(','):
fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{assr["ASSR"]}/resources/{res}/files/{fmatch}'
fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{assrlabel}/resources/{res}/files/{fmatch}'
inputs.append(self._input(
fpath,
'FILE',
Expand All @@ -517,7 +550,7 @@ def get_subject_inputs(self, spec, info, subject):
))
else:
# We want the whole resource as one download
fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{assr["ASSR"]}/resources/{res}/files'
fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{assrlabel}/resources/{res}/files'
inputs.append(self._input(
fpath,
'DIR',
Expand Down