Skip to content

Commit

Permalink
ENH: Large refactor of the orchestration workflow [wipe phdiff_*]
Browse files Browse the repository at this point in the history
This PR:
  - [x] Refines the work in #53 addressing #40.
  - [x] Adds tests to cover the orchestration workflow.
  - [x] Fixes #7 (added regression tests for this bug).
  - [x] Updates the interface of phdiff workflows: removes the metadata
        input, which was defined for this workflow solely.
  - [x] Makes it trivial to extend the phdiff workflow to phase1/phase2
        workflows (#15).
  • Loading branch information
oesteban committed Nov 20, 2019
1 parent 83e2dbf commit d76755c
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 57 deletions.
77 changes: 41 additions & 36 deletions sdcflows/workflows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,6 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
method (for reporting purposes)
"""
if ignore is None:
ignore = tuple()

if not isinstance(ignore, (list, tuple)):
ignore = tuple(ignore)

# TODO: To be removed (filter out unsupported fieldmaps):
fmaps = [fmap for fmap in fmaps if fmap['suffix'] in FMAP_PRIORITY]

workflow = Workflow(name='sdc_estimate_wf' if fmaps else 'sdc_bypass_wf')
inputnode = pe.Node(niu.IdentityInterface(
fields=['epi_file', 'epi_brain', 'epi_mask', 't1w_brain', 'std2anat_xfm']),
Expand All @@ -99,7 +90,8 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
name='outputnode')

# No fieldmaps - forward inputs to outputs
if not fmaps or 'fieldmaps' in ignore:
ignored = False if ignore is None else 'fieldmaps' in ignore
if not fmaps or ignored:
workflow.__postdesc__ = """\
Susceptibility distortion correction (SDC) has been skipped because the
dataset does not contain extra field map acquisitions correctly described
Expand All @@ -119,18 +111,26 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
accurate co-registration with the anatomical reference.
"""

# In case there are multiple fieldmaps prefer EPI
fmaps.sort(key=lambda fmap: FMAP_PRIORITY[fmap['suffix']])
fmap = fmaps[0]
only_syn = 'syn' in fmaps and len(fmaps) == 1

# PEPOLAR path
if 'epi' in fmaps:
from .pepolar import init_pepolar_unwarp_wf, check_pes

# SyN works without this metadata
if epi_meta.get('PhaseEncodingDirection') is None:
raise ValueError(
'PhaseEncodingDirection is not defined within the metadata retrieved '
'for the intended EPI (DWI, BOLD, or SBRef) run.')
outputnode.inputs.method = 'PEB/PEPOLAR (phase-encoding based / PE-POLARity)'

# Filter out EPI fieldmaps to be used
fmaps_epi = [(epi.path, epi.get_metadata()['PhaseEncodingDirection'])
for epi in fmaps['epi']]
fmaps_epi = [(v[0], v[1].get('PhaseEncodingDirection'))
for v in fmaps['epi']]

if not all(list(zip(*fmaps_epi))[1]):
raise ValueError(
'At least one of the EPI runs with alternative phase-encoding '
'blips is missing the required "PhaseEncodingDirection" metadata entry.')

# Find matched PE directions
matched_pe = check_pes(fmaps_epi, epi_meta['PhaseEncodingDirection'])
Expand All @@ -150,32 +150,34 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
])

# FIELDMAP path
elif 'fieldmap' in fmaps:
elif 'fieldmap' in fmaps or 'phasediff' in fmaps:
from .unwarp import init_sdc_unwarp_wf
# Import specific workflows here, so we don't break everything with one
# unused workflow.
suffices = {f.suffix for f in fmaps['fieldmap']}
if 'fieldmap' in suffices:

# SyN works without this metadata
if epi_meta.get('PhaseEncodingDirection') is None:
raise ValueError(
'PhaseEncodingDirection is not defined within the metadata retrieved '
'for the intended EPI (DWI, BOLD, or SBRef) run.')

if 'fieldmap' in fmaps:
from .fmap import init_fmap_wf
outputnode.inputs.method = 'FMB (fieldmap-based)'
outputnode.inputs.method = 'FMB (fieldmap-based) - directly measured B0 map'
fmap_wf = init_fmap_wf(
omp_nthreads=omp_nthreads,
fmap_bspline=False)
# set inputs
fmap_wf.inputs.inputnode.magnitude = fmap['magnitude']
fmap_wf.inputs.inputnode.fieldmap = fmap['fieldmap']
elif 'phasediff' in suffices:
fmap_wf.inputs.inputnode.magnitude = [
m for m, _ in fmaps['fieldmap']['magnitude']]
fmap_wf.inputs.inputnode.fieldmap = [
m for m, _ in fmaps['fieldmap']['fieldmap']]
elif 'phasediff' in fmaps:
from .phdiff import init_phdiff_wf
outputnode.inputs.method = 'FMB (fieldmap-based) - phase-difference map'
fmap_wf = init_phdiff_wf(omp_nthreads=omp_nthreads)
# set inputs
fmap_wf.inputs.inputnode.phasediff = fmap['phasediff']
fmap_wf.inputs.inputnode.magnitude = [
fmap_ for key, fmap_ in sorted(fmap.items())
if key.startswith("magnitude")
]
else:
raise ValueError('Fieldmaps of types %s are not supported' %
', '.join(['"%s"' % f for f in suffices]))
m for m, _ in fmaps['phasediff']['magnitude']]
fmap_wf.inputs.inputnode.phasediff = fmaps['phasediff']['phases']

sdc_unwarp_wf = init_sdc_unwarp_wf(
omp_nthreads=omp_nthreads,
Expand All @@ -193,24 +195,27 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
('outputnode.fmap_ref', 'inputnode.fmap_ref'),
('outputnode.fmap_mask', 'inputnode.fmap_mask')]),
])
elif not only_syn:
raise ValueError('Fieldmaps of types %s are not supported' %
', '.join(['"%s"' % f for f in fmaps]))

# FIELDMAP-less path
if any(fm['suffix'] == 'syn' for fm in fmaps):
if 'syn' in fmaps:
from .syn import init_syn_sdc_wf
syn_sdc_wf = init_syn_sdc_wf(
epi_pe=epi_meta.get('PhaseEncodingDirection', None),
omp_nthreads=omp_nthreads)

workflow.connect([
(inputnode, syn_sdc_wf, [
('epi_file', 'inputnode.in_reference'),
('epi_brain', 'inputnode.in_reference_brain'),
('t1w_brain', 'inputnode.t1w_brain'),
('epi_file', 'inputnode.epi_file'),
('epi_brain', 'inputnode.epi_brain'),
('std2anat_xfm', 'inputnode.std2anat_xfm')]),
])

# XXX Eliminate branch when forcing isn't an option
if fmap['suffix'] == 'syn': # No fieldmaps, but --use-syn
if only_syn: # No fieldmaps, but --use-syn
outputnode.inputs.method = 'FLB ("fieldmap-less", SyN-based)'
sdc_unwarp_wf = syn_sdc_wf
else: # --force-syn was called when other fieldmap was present
Expand Down
13 changes: 10 additions & 3 deletions sdcflows/workflows/gre.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ def init_fmap_postproc_wf(omp_nthreads, fmap_bspline, median_kernel_size=3,
"""
workflow = Workflow(name=name)
inputnode = pe.Node(niu.IdentityInterface(
fields=['fmap_mask', 'fmap_ref', 'fmap']), name='inputnode')
outputnode = pe.Node(niu.IdentityInterface(fields=['out_fmap']),
fields=['fmap_mask', 'fmap_ref', 'fmap', 'metadata']), name='inputnode')
outputnode = pe.Node(niu.IdentityInterface(fields=['out_fmap', 'metadata']),
name='outputnode')
if fmap_bspline:
from ..interfaces.fmap import FieldEnhance
Expand All @@ -156,11 +156,12 @@ def init_fmap_postproc_wf(omp_nthreads, fmap_bspline, median_kernel_size=3,

workflow.connect([
(inputnode, cleanup_wf, [('fmap_mask', 'inputnode.in_mask')]),
(inputnode, recenter, [('fmap', 'in_file')]),
(inputnode, recenter, [(('fmap', _pop), 'in_file')]),
(recenter, denoise, [('out', 'in_file')]),
(denoise, demean, [('out_file', 'in_file')]),
(demean, cleanup_wf, [('out', 'inputnode.in_file')]),
(cleanup_wf, outputnode, [('outputnode.out_file', 'out_fmap')]),
(inputnode, outputnode, [(('metadata', _pop), 'metadata')]),
])

return workflow
Expand Down Expand Up @@ -218,3 +219,9 @@ def _demean(in_file, in_mask=None, usemode=True):
newpath=getcwd())
nb.Nifti1Image(data, nii.affine, nii.header).to_filename(out_file)
return out_file


def _pop(inlist):
if isinstance(inlist, (tuple, list)):
return inlist[0]
return inlist
35 changes: 22 additions & 13 deletions sdcflows/workflows/phdiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ def init_phdiff_wf(omp_nthreads, name='phdiff_wf'):
Inputs
------
magnitude : pathlike
Path to the corresponding magnitude path(s).
phasediff : pathlike
Path to the corresponding phase-difference file.
metadata : dict
Metadata dictionary corresponding to the phasediff input
magnitude : list of os.pathlike
List of path(s) the GRE magnitude maps.
phasediff : list of tuple(os.pathlike, dict)
List containing one GRE phase-difference map with its corresponding metadata
(requires ``EchoTime1`` and ``EchoTime2``), or the phase maps for the two
subsequent echoes, with their metadata (requires ``EchoTime``).
Outputs
-------
Expand All @@ -90,39 +90,48 @@ def init_phdiff_wf(omp_nthreads, name='phdiff_wf'):
further improvements of HCP Pipelines [@hcppipelines].
"""

inputnode = pe.Node(niu.IdentityInterface(fields=['magnitude', 'phasediff', 'metadata']),
inputnode = pe.Node(niu.IdentityInterface(fields=['magnitude', 'phasediff']),
name='inputnode')

outputnode = pe.Node(niu.IdentityInterface(
fields=['fmap', 'fmap_ref', 'fmap_mask']), name='outputnode')

split = pe.MapNode(niu.Function(function=_split, output_names=['map_file', 'meta']),
iterfield=['phasediff'], run_without_submitting=True, name='split')

magnitude_wf = init_magnitude_wf(omp_nthreads=omp_nthreads)

# phase diff -> radians
phmap2rads = pe.Node(PhaseMap2rads(), name='phmap2rads',
run_without_submitting=True)
phmap2rads = pe.MapNode(PhaseMap2rads(), name='phmap2rads',
iterfield=['in_file'], run_without_submitting=True)
# FSL PRELUDE will perform phase-unwrapping
prelude = pe.Node(fsl.PRELUDE(), name='prelude')
prelude = pe.MapNode(fsl.PRELUDE(), iterfield=['phase_file'], name='prelude')

fmap_postproc_wf = init_fmap_postproc_wf(omp_nthreads=omp_nthreads,
fmap_bspline=False)
compfmap = pe.Node(Phasediff2Fieldmap(), name='compfmap')

workflow.connect([
(inputnode, compfmap, [('metadata', 'metadata')]),
(inputnode, split, [('phasediff', 'phasediff')]),
(inputnode, magnitude_wf, [('magnitude', 'inputnode.magnitude')]),
(magnitude_wf, prelude, [('outputnode.fmap_ref', 'magnitude_file'),
('outputnode.fmap_mask', 'mask_file')]),
(inputnode, phmap2rads, [('phasediff', 'in_file')]),
(split, phmap2rads, [('map_file', 'in_file')]),
(phmap2rads, prelude, [('out_file', 'phase_file')]),
(split, fmap_postproc_wf, [('meta', 'inputnode.metadata')]),
(prelude, fmap_postproc_wf, [('unwrapped_phase_file', 'inputnode.fmap')]),
(magnitude_wf, fmap_postproc_wf, [
('outputnode.fmap_mask', 'inputnode.fmap_mask'),
('outputnode.fmap_ref', 'inputnode.fmap_ref')]),
(fmap_postproc_wf, compfmap, [('outputnode.out_fmap', 'in_file')]),
(fmap_postproc_wf, compfmap, [('outputnode.out_fmap', 'in_file'),
('outputnode.metadata', 'metadata')]),
(compfmap, outputnode, [('out_file', 'fmap')]),
(magnitude_wf, outputnode, [('outputnode.fmap_ref', 'fmap_ref'),
('outputnode.fmap_mask', 'fmap_mask')]),
])

return workflow


def _split(phasediff):
return phasediff
10 changes: 5 additions & 5 deletions sdcflows/workflows/tests/test_phdiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ def test_workflow(bids_layouts, tmpdir, output_path, dataset, workdir):
return_type='file',
extension=['.nii', '.nii.gz'])

phdiff_file = data.get(suffix='phasediff', acq='v4',
extension=['.nii', '.nii.gz'])[0]
phdiff_files = data.get(suffix='phasediff', acq='v4',
extension=['.nii', '.nii.gz'])

phdiff_wf.inputs.inputnode.phasediff = phdiff_file.path
phdiff_wf.inputs.inputnode.metadata = phdiff_file.get_metadata()
phdiff_wf.inputs.inputnode.phasediff = [
(ph.path, ph.get_metadata()) for ph in phdiff_files]

if output_path:
from ...interfaces.reportlets import FieldmapReportlet
Expand All @@ -36,7 +36,7 @@ def test_workflow(bids_layouts, tmpdir, output_path, dataset, workdir):
dsink = pe.Node(DerivativesDataSink(
base_directory=str(output_path), keep_dtype=True), name='dsink')
dsink.interface.out_path_base = 'sdcflows'
dsink.inputs.source_file = phdiff_file.path
dsink.inputs.source_file = phdiff_files[0].path

wf.connect([
(phdiff_wf, rep, [
Expand Down

0 comments on commit d76755c

Please sign in to comment.