Source code for sdcflows.cli.parser

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2023 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
#     https://www.nipreps.org/community/licensing/
#
"""Standalone command line executable for estimation of fieldmaps."""
import re
from argparse import Action, ArgumentDefaultsHelpFormatter, ArgumentParser
from functools import partial
from pathlib import Path

from sdcflows import config


def _parse_participant_labels(value):
    """
    Drop ``sub-`` prefix of participant labels.

    >>> _parse_participant_labels("s060")
    ['s060']
    >>> _parse_participant_labels("sub-s060")
    ['s060']
    >>> _parse_participant_labels("s060 sub-s050")
    ['s050', 's060']
    >>> _parse_participant_labels("s060 sub-s060")
    ['s060']
    >>> _parse_participant_labels("s060\tsub-s060")
    ['s060']

    """
    return sorted(
        set(
            re.sub(r"^sub-", "", item.strip())
            for item in re.split(r"\s+", f"{value}".strip())
        )
    )


def _parser():
    class ParticipantLabelAction(Action):
        def __call__(self, parser, namespace, values, option_string=None):
            setattr(namespace, self.dest, _parse_participant_labels(" ".join(values)))

    def _path_exists(path, parser):
        """Ensure a given path exists."""
        if path is None or not Path(path).exists():
            raise parser.error(f"Path does not exist: <{path}>.")
        return Path(path).expanduser().absolute()

    def _min_one(value, parser):
        """Ensure an argument is not lower than 1."""
        value = int(value)
        if value < 1:
            raise parser.error("Argument can't be less than one.")
        return value

    def _to_gb(value):
        scale = {"G": 1, "T": 10**3, "M": 1e-3, "K": 1e-6, "B": 1e-9}
        digits = "".join([c for c in value if c.isdigit()])
        n_digits = len(digits)
        units = value[n_digits:] or "G"
        return int(digits) * scale[units[0]]

    def _bids_filter(value):
        from json import loads

        if value and Path(value).exists():
            return loads(Path(value).read_text())

    parser = ArgumentParser(
        description=f"""\
SDCFlows {config.environment.version}

Estimate fieldmaps available in a BIDS-compliant MRI dataset.""",
        formatter_class=ArgumentDefaultsHelpFormatter,
    )
    PathExists = partial(_path_exists, parser=parser)
    PositiveInt = partial(_min_one, parser=parser)

    parser.add_argument(
        "bids_dir",
        action="store",
        type=PathExists,
        help="The root folder of a BIDS valid dataset (sub-XXXXX folders should "
        "be found at the top level in this folder).",
    )
    parser.add_argument(
        "output_dir",
        action="store",
        type=Path,
        help="The directory where the output files "
        "should be stored. If you are running group level analysis "
        "this folder should be prepopulated with the results of the "
        "participant level analysis.",
    )
    parser.add_argument(
        "analysis_level",
        action="store",
        nargs="+",
        help="Level of the analysis that will be performed. "
        "Multiple participant level analyses can be run independently "
        "(in parallel) using the same output_dir.",
        choices=["participant", "group"],
    )

    # optional arguments
    parser.add_argument(
        "--version", action="version", version=f"SDCFlows {config.environment.version}"
    )
    parser.add_argument(
        "-v",
        "--verbose",
        dest="verbose_count",
        action="count",
        default=0,
        help="Increases log verbosity for each occurrence, debug level is -vvv.",
    )

    # main options
    g_bids = parser.add_argument_group("Options related to BIDS")
    g_bids.add_argument(
        "--participant-label",
        action=ParticipantLabelAction,
        nargs="+",
        help="A space delimited list of participant identifiers or a single "
        "identifier (the sub- prefix can be removed).",
    )
    g_bids.add_argument(
        "--session-label",
        action="store",
        nargs="*",
        type=str,
        help="Filter input dataset by session label.",
    )
    g_bids.add_argument(
        "--bids-filter-file",
        action="store",
        type=Path,
        metavar="PATH",
        help="a JSON file describing custom BIDS input filter using pybids "
        "{<suffix>:{<entity>:<filter>,...},...} "
        "(https://github.com/bids-standard/pybids/blob/master/bids/layout/config/bids.json)",
    )
    g_bids.add_argument(
        "--bids-database-dir",
        metavar="PATH",
        type=PathExists,
        help="Path to an existing PyBIDS database folder, for faster indexing "
        "(especially useful for large datasets).",
    )
    g_bids.add_argument(
        "--bids-database-wipe",
        action="store_true",
        default=False,
        help="Wipe out previously existing BIDS indexing caches, forcing re-indexing.",
    )

    # General performance
    g_perfm = parser.add_argument_group("Options to handle performance")
    g_perfm.add_argument(
        "--nprocs",
        action="store",
        type=PositiveInt,
        help="""\
Maximum number of simultaneously running parallel processes executed by *SDCFlows* \
(e.g., several instances of ANTs' registration). \
However, when ``--nprocs`` is greater or equal to the ``--omp-nthreads`` option, \
it also sets the maximum number of threads that simultaneously running processes \
may aggregate (meaning, with ``--nprocs 16 --omp-nthreads 8`` a maximum of two \
8-CPU-threaded processes will be running at a given time). \
Under this mode of operation, ``--nprocs`` sets the maximum number of processors \
that can be assigned work within an *SDCFlows* job, which includes all the processors \
used by currently running single- and multi-threaded processes. \
If ``None``, the number of CPUs available will be automatically assigned (which may \
not be what you want in, e.g., shared systems like a HPC cluster.""",
    )
    g_perfm.add_argument(
        "--omp-nthreads",
        action="store",
        type=PositiveInt,
        help="""\
Maximum number of threads that multi-threaded processes executed by *SDCFlows* \
(e.g., ANTs' registration) can use. \
If ``None``, the number of CPUs available will be automatically assigned (which may \
not be what you want in, e.g., shared systems like a HPC cluster.""",
    )
    g_perfm.add_argument(
        "--mem-gb",
        dest="memory_gb",
        action="store",
        type=_to_gb,
        help="Upper bound memory limit for SDCFlows processes.",
    )
    g_perfm.add_argument(
        "--debug",
        action="store_true",
        default=False,
        help="Enable changes to processing to aid in debugging",
    )
    g_perfm.add_argument(
        "--pdb",
        dest="pdb",
        action="store_true",
        default=False,
        help="Open Python debugger (pdb) on exceptions.",
    )
    g_perfm.add_argument(
        "--sloppy",
        action="store_true",
        default=False,
        help="Use sloppy mode for minimal footprint.",
    )

    # Control instruments
    g_outputs = parser.add_argument_group("Instrumental options")
    g_outputs.add_argument(
        "-w",
        "--work-dir",
        action="store",
        type=Path,
        default=Path("work").absolute(),
        help="Path where intermediate results should be stored.",
    )
    g_outputs.add_argument(
        "-n",
        "--dry-run",
        action="store_true",
        default=False,
        help="only find estimable fieldmaps (that is, estimation is not triggered)",
    )
    g_outputs.add_argument(
        "--no-fmapless",
        action="store_false",
        dest="fmapless",
        default=True,
        help="Allow fieldmap-less estimation",
    )
    g_outputs.add_argument(
        "--use-plugin",
        action="store",
        default=None,
        type=Path,
        help="Nipype plugin configuration file.",
    )
    g_outputs.add_argument(
        "--notrack",
        action="store_true",
        help="Opt-out of sending tracking information of this run to the NiPreps developers. "
        "This information helps to improve SDCFlows and provides an indicator of "
        "real world usage for obtaining funding.",
    )

    return parser


[docs] def parse_args(args=None, namespace=None): """Parse args and run further checks on the command line.""" from logging import DEBUG from json import loads parser = _parser() opts = parser.parse_args(args, namespace) config.execution.log_level = int(max(25 - 5 * opts.verbose_count, DEBUG)) config.from_dict(vars(opts)) # Load base plugin_settings from file if --use-plugin if opts.use_plugin is not None: from yaml import load as loadyml with open(opts.use_plugin) as f: plugin_settings = loadyml(f) _plugin = plugin_settings.get("plugin") if _plugin: config.nipype.plugin = _plugin config.nipype.plugin_args = plugin_settings.get("plugin_args", {}) config.nipype.nprocs = config.nipype.plugin_args.get( "nprocs", config.nipype.nprocs ) # Load BIDS filters if opts.bids_filter_file: config.execution.bids_filters = loads(opts.bids_filter_file.read_text()) bids_dir = config.execution.bids_dir output_dir = config.execution.output_dir work_dir = config.execution.work_dir version = config.environment.version # Ensure input and output folders are not the same if output_dir == bids_dir: parser.error( "The selected output folder is the same as the input BIDS folder. " "Please modify the output path (suggestion: %s)." % bids_dir / "derivatives" / ("sdcflows_%s" % version.split("+")[0]) ) if bids_dir in work_dir.parents: parser.error( "The selected working directory is a subdirectory of the input BIDS folder. " "Please modify the output path." ) # Setup directories config.execution.log_dir = output_dir / "logs" # Check and create output and working directories config.execution.log_dir.mkdir(exist_ok=True, parents=True) output_dir.mkdir(exist_ok=True, parents=True) work_dir.mkdir(exist_ok=True, parents=True) # Force initialization of the BIDSLayout config.execution.init() participant_label = config.execution.layout.get_subjects() if config.execution.participant_label is not None: selected_label = set(config.execution.participant_label) missing_subjects = selected_label - set(participant_label) if missing_subjects: parser.error( "One or more participant labels were not found in the BIDS directory: " f"{', '.join(missing_subjects)}." ) participant_label = selected_label config.execution.participant_label = sorted(participant_label) # Handle analysis_level analysis_level = set(config.workflow.analysis_level) config.workflow.analysis_level = list(analysis_level)