diff --git a/fmriprep/cli/parser.py b/fmriprep/cli/parser.py index 195706610..a7e582be8 100644 --- a/fmriprep/cli/parser.py +++ b/fmriprep/cli/parser.py @@ -22,10 +22,16 @@ # """Parser.""" +from __future__ import annotations + import sys +import typing as ty from .. import config +if ty.TYPE_CHECKING: + from bids.layout import BIDSLayout + def _build_parser(**kwargs): """Build parser object. @@ -91,6 +97,9 @@ def _to_gb(value): def _drop_sub(value): return value[4:] if value.startswith('sub-') else value + def _drop_ses(value): + return value[4:] if value.startswith('ses-') else value + def _process_value(value): import bids @@ -193,8 +202,14 @@ def _slice_time_ref(value, parser): 'identifier (the sub- prefix can be removed)', ) # Re-enable when option is actually implemented - # g_bids.add_argument('-s', '--session-id', action='store', default='single_session', - # help='Select a specific session to be processed') + g_bids.add_argument( + '-s', + '--session-id', + action='store', + nargs='+', + type=_drop_ses, + help='A space-delimited list of session identifiers or a single identifier', + ) # Re-enable when option is actually implemented # g_bids.add_argument('-r', '--run-id', action='store', default='single_run', # help='Select a specific run to be processed') @@ -908,4 +923,36 @@ def parse_args(args=None, namespace=None): ) config.execution.participant_label = sorted(participant_label) + + config.execution.unique_labels = compute_subworkflows( + layout=config.execution.layout, + participant_ids=config.execution.participant_label, + session_ids=config.execution.session_id, + ) config.workflow.skull_strip_template = config.workflow.skull_strip_template[0] + + +def compute_subworkflows( + *, + layout: BIDSLayout, + participant_ids: list, + session_ids: list | None = None, +) -> list: + """Compute subworkflows based on participant and session IDs. + + Query all available participants and sessions, and construct the combinations of the + subworkflows needed. + """ + from niworkflows.utils.bids import collect_participants + + # consists of (subject_id, session_id) tuples + subworkflows = [] + + subject_list = collect_participants(layout, participant_ids, strict=True) + for subject in subject_list: + # Due to rapidly changing morphometry of the population + # Ensure each subject session is processed individually + sessions = session_ids or layout.get_sessions(scope='raw', subject=subject) or [None] + for session in sessions: + subworkflows.append((subject, session)) + return subworkflows diff --git a/fmriprep/cli/run.py b/fmriprep/cli/run.py index af4ffad4b..1d5ad1fa6 100644 --- a/fmriprep/cli/run.py +++ b/fmriprep/cli/run.py @@ -220,16 +220,10 @@ def main(): from fmriprep.reports.core import generate_reports - # Generate reports phase - session_list = ( - config.execution.get().get('bids_filters', {}).get('bold', {}).get('session') - ) - failed_reports = generate_reports( - config.execution.participant_label, + config.execution.unique_labels, config.execution.fmriprep_dir, config.execution.run_uuid, - session_list=session_list, ) write_derivative_description( config.execution.bids_dir, diff --git a/fmriprep/cli/workflow.py b/fmriprep/cli/workflow.py index 89310590b..1e47b80ba 100644 --- a/fmriprep/cli/workflow.py +++ b/fmriprep/cli/workflow.py @@ -35,7 +35,6 @@ def build_workflow(config_file, retval): """Create the Nipype Workflow that supports the whole execution graph.""" - from niworkflows.utils.bids import collect_participants from niworkflows.utils.misc import check_valid_fs_license from fmriprep.reports.core import generate_reports @@ -77,25 +76,17 @@ def build_workflow(config_file, retval): desc_content = dset_desc_path.read_bytes() config.execution.bids_description_hash = sha256(desc_content).hexdigest() - # First check that bids_dir looks like a BIDS folder - subject_list = collect_participants( - config.execution.layout, participant_label=config.execution.participant_label - ) - # Called with reports only if config.execution.reports_only: - build_log.log(25, 'Running --reports-only on participants %s', ', '.join(subject_list)) - session_list = ( - config.execution.bids_filters.get('bold', {}).get('session') - if config.execution.bids_filters - else None + build_log.log( + 25, + 'Running --reports-only on participants %s', + ', '.join(config.execution.unique_labels), ) - failed_reports = generate_reports( - config.execution.participant_label, + config.execution.unique_labels, config.execution.fmriprep_dir, config.execution.run_uuid, - session_list=session_list, ) if failed_reports: config.loggers.cli.error( @@ -110,7 +101,7 @@ def build_workflow(config_file, retval): init_msg = [ "Building fMRIPrep's workflow:", f'BIDS dataset path: {config.execution.bids_dir}.', - f'Participant list: {subject_list}.', + f'Participant list: {config.execution.unique_labels}.', f'Run identifier: {config.execution.run_uuid}.', f'Output spaces: {config.execution.output_spaces}.', ] @@ -123,7 +114,7 @@ def build_workflow(config_file, retval): build_log.log(25, f"\n{' ' * 11}* ".join(init_msg)) - retval['workflow'] = init_fmriprep_wf() + retval['workflow'] = init_fmriprep_wf(config.execution.unique_labels) # Check for FS license after building the workflow if not check_valid_fs_license(): diff --git a/fmriprep/config.py b/fmriprep/config.py index 08fbb51ce..20ec6154e 100644 --- a/fmriprep/config.py +++ b/fmriprep/config.py @@ -433,12 +433,16 @@ class execution(_Config): """Only build the reports, based on the reportlets found in a cached working directory.""" run_uuid = f"{strftime('%Y%m%d-%H%M%S')}_{uuid4()}" """Unique identifier of this particular run.""" + session_id = None + """List of session identifiers that are to be preprocessed.""" participant_label = None """List of participant identifiers that are to be preprocessed.""" task_id = None """Select a particular task from all available in the dataset.""" templateflow_home = _templateflow_home """The root folder of the TemplateFlow client.""" + unique_labels = None + """Combinations of subject + session identifiers to be preprocessed.""" work_dir = Path('work').absolute() """Path to a working directory where intermediate results will be available.""" write_graph = False @@ -466,6 +470,12 @@ class execution(_Config): @classmethod def init(cls): """Create a new BIDS Layout accessible with :attr:`~execution.layout`.""" + # Convert string literal None to NoneType + if cls.unique_labels: + cls.unique_labels = [ + [sub, ses] if ses != 'None' else [sub, None] for sub, ses in cls.unique_labels + ] + if cls.fs_license_file and Path(cls.fs_license_file).is_file(): os.environ['FS_LICENSE'] = str(cls.fs_license_file) diff --git a/fmriprep/workflows/base.py b/fmriprep/workflows/base.py index df3268a5d..25f8e12c3 100644 --- a/fmriprep/workflows/base.py +++ b/fmriprep/workflows/base.py @@ -46,7 +46,7 @@ from ..utils.bids import dismiss_echo -def init_fmriprep_wf(): +def init_fmriprep_wf(subworkflows_list): """ Build *fMRIPrep*'s pipeline. @@ -63,8 +63,15 @@ def init_fmriprep_wf(): from fmriprep.workflows.tests import mock_config from fmriprep.workflows.base import init_fmriprep_wf + with mock_config(): - wf = init_fmriprep_wf() + wf = init_fmriprep_wf([('01', '01')]) + + Parameters + ---------- + subworkflows_list: :obj:`list` of :obj:`tuple` + A list of the subworkflows to create. + Each subject session is run as an individual workflow. """ from niworkflows.engine.workflows import LiterateWorkflow as Workflow @@ -90,12 +97,21 @@ def init_fmriprep_wf(): if config.execution.fs_subjects_dir is not None: fsdir.inputs.subjects_dir = str(config.execution.fs_subjects_dir.absolute()) - for subject_id in config.execution.participant_label: - single_subject_wf = init_single_subject_wf(subject_id) + for subject_id, session_id in subworkflows_list: + single_subject_wf = init_single_subject_wf( + subject_id, + session_id=session_id, + ) + + bids_level = [f'sub-{subject_id}'] + if session_id: + bids_level.append(f'ses-{session_id}') - single_subject_wf.config['execution']['crashdump_dir'] = str( - config.execution.fmriprep_dir / f'sub-{subject_id}' / 'log' / config.execution.run_uuid + log_dir = ( + config.execution.nibabies_dir.joinpath(*bids_level) / 'log' / config.execution.run_uuid ) + + single_subject_wf.config['execution']['crashdump_dir'] = str(log_dir) for node in single_subject_wf._get_all_nodes(): node.config = deepcopy(single_subject_wf.config) if freesurfer: @@ -104,16 +120,16 @@ def init_fmriprep_wf(): fmriprep_wf.add_nodes([single_subject_wf]) # Dump a copy of the config file into the log directory - log_dir = ( - config.execution.fmriprep_dir / f'sub-{subject_id}' / 'log' / config.execution.run_uuid - ) log_dir.mkdir(exist_ok=True, parents=True) config.to_filename(log_dir / 'fmriprep.toml') return fmriprep_wf -def init_single_subject_wf(subject_id: str): +def init_single_subject_wf( + subject_id: str, + session_id: str | None = None, +): """ Organize the preprocessing pipeline for a single subject. @@ -138,6 +154,8 @@ def init_single_subject_wf(subject_id: str): ---------- subject_id : :obj:`str` Subject label for this single-subject workflow. + session_id : :obj:`str` or None + Session identifier. Inputs ------ @@ -167,7 +185,9 @@ def init_single_subject_wf(subject_id: str): from fmriprep.workflows.bold.base import init_bold_wf - workflow = Workflow(name=f'sub_{subject_id}_wf') + name = f'sub_{subject_id}_ses_{session_id}_wf' if session_id else f'sub_{subject_id}_wf' + + workflow = Workflow(name=name) workflow.__desc__ = f""" Results included in this manuscript come from preprocessing performed using *fMRIPrep* {config.environment.version} @@ -202,6 +222,7 @@ def init_single_subject_wf(subject_id: str): subject_data = collect_data( config.execution.layout, subject_id, + session_id=session_id, task=config.execution.task_id, echo=config.execution.echo_idx, bids_filters=config.execution.bids_filters, @@ -254,6 +275,7 @@ def init_single_subject_wf(subject_id: str): collect_anat_derivatives( derivatives_dir=deriv_dir, subject_id=subject_id, + session_id=session_id, std_spaces=std_spaces, ) )