# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2021 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
# https://www.nipreps.org/community/licensing/
#
"""Helpers for handling BIDS-like neuroimaging structures."""
from pathlib import Path
import json
import re
import warnings
from bids import BIDSLayout
from bids.layout import Query
from packaging.version import Version
DEFAULT_BIDS_QUERIES = {
"bold": {"datatype": "func", "suffix": "bold", "part": ["mag", None]},
"dwi": {"suffix": "dwi"},
"flair": {"datatype": "anat", "suffix": "FLAIR", "part": ["mag", None]},
"fmap": {"datatype": "fmap"},
"pet": {"suffix": "pet"},
"roi": {"datatype": "anat", "suffix": "roi"},
"sbref": {"datatype": "func", "suffix": "sbref", "part": ["mag", None]},
"t1w": {"datatype": "anat", "suffix": "T1w", "part": ["mag", None]},
"t2w": {"datatype": "anat", "suffix": "T2w", "part": ["mag", None]},
"asl": {"datatype": "perf", "suffix": "asl"},
}
[docs]
class BIDSError(ValueError):
def __init__(self, message, bids_root):
indent = 10
header = '{sep} BIDS root folder: "{bids_root}" {sep}'.format(
bids_root=bids_root, sep="".join(["-"] * indent)
)
self.msg = "\n{header}\n{indent}{message}\n{footer}".format(
header=header,
indent="".join([" "] * (indent + 1)),
message=message,
footer="".join(["-"] * len(header)),
)
super().__init__(self.msg)
self.bids_root = bids_root
[docs]
class BIDSWarning(RuntimeWarning):
pass
[docs]
def collect_participants(
bids_dir, participant_label=None, strict=False, bids_validate=True
):
"""
List the participants under the BIDS root and checks that participants
designated with the participant_label argument exist in that folder.
Returns the list of participants to be finally processed.
Requesting all subjects in a BIDS directory root:
.. testsetup::
>>> data_dir_canary()
Examples
--------
>>> collect_participants(str(datadir / 'ds114'), bids_validate=False)
['01', '02', '03', '04', '05', '06', '07', '08', '09', '10']
Requesting two subjects, given their IDs:
>>> collect_participants(str(datadir / 'ds114'), participant_label=['02', '04'],
... bids_validate=False)
['02', '04']
Requesting two subjects, given their IDs (works with 'sub-' prefixes):
>>> collect_participants(str(datadir / 'ds114'), participant_label=['sub-02', 'sub-04'],
... bids_validate=False)
['02', '04']
Requesting two subjects, but one does not exist:
>>> collect_participants(str(datadir / 'ds114'), participant_label=['02', '14'],
... bids_validate=False)
['02']
>>> collect_participants(
... str(datadir / 'ds114'), participant_label=['02', '14'],
... strict=True, bids_validate=False) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
BIDSError:
...
"""
if isinstance(bids_dir, BIDSLayout):
layout = bids_dir
else:
layout = BIDSLayout(str(bids_dir), validate=bids_validate)
all_participants = set(layout.get_subjects())
# Error: bids_dir does not contain subjects
if not all_participants:
raise BIDSError(
"Could not find participants. Please make sure the BIDS data "
"structure is present and correct. Datasets can be validated "
"online using the BIDS Validator "
"(https://bids-standard.github.io/bids-validator/).\n"
"If you are using Docker for Mac or Docker for Windows, you "
'may need to adjust your "File sharing" preferences.',
bids_dir,
)
# No --participant-label was set, return all
if not participant_label:
return sorted(all_participants)
if isinstance(participant_label, str):
participant_label = [participant_label]
# Drop sub- prefixes
participant_label = [
sub[4:] if sub.startswith("sub-") else sub for sub in participant_label
]
# Remove duplicates
participant_label = sorted(set(participant_label))
# Remove labels not found
found_label = sorted(set(participant_label) & all_participants)
if not found_label:
raise BIDSError(
"Could not find participants [{}]".format(", ".join(participant_label)),
bids_dir,
)
# Warn if some IDs were not found
notfound_label = sorted(set(participant_label) - all_participants)
if notfound_label:
exc = BIDSError(
"Some participants were not found: {}".format(", ".join(notfound_label)),
bids_dir,
)
if strict:
raise exc
warnings.warn(exc.msg, BIDSWarning)
return found_label
[docs]
def collect_data(
bids_dir,
participant_label,
session_id=None,
task=None,
echo=None,
group_echos=True,
bids_validate=True,
bids_filters=None,
queries=None,
):
"""
Uses pybids to retrieve the input data for a given participant
.. testsetup::
>>> data_dir_canary()
Parameters
----------
bids_dir : :obj:`str` or :obj:`bids.layout.BIDSLayout`
The BIDS directory
participant_label : :obj:`str`
The participant identifier
session_id : :obj:`str`, None, or :obj:`bids.layout.Query`
The session identifier. By default, all sessions will be used.
task : :obj:`str` or None
The task identifier (for BOLD queries)
echo : :obj:`int` or None
The echo identifier (for BOLD queries)
bids_validate : :obj:`bool`
Whether the `bids_dir` is validated upon initialization
bids_filters: :obj:`dict` or None
Custom filters to alter default queries
Examples
--------
>>> bids_root, _ = collect_data(str(datadir / 'ds054'), '100185',
... bids_validate=False)
>>> bids_root['fmap'] # doctest: +ELLIPSIS
['.../ds054/sub-100185/fmap/sub-100185_magnitude1.nii.gz', \
'.../ds054/sub-100185/fmap/sub-100185_magnitude2.nii.gz', \
'.../ds054/sub-100185/fmap/sub-100185_phasediff.nii.gz']
>>> bids_root['bold'] # doctest: +ELLIPSIS
['.../ds054/sub-100185/func/sub-100185_task-machinegame_run-01_bold.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-02_bold.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-03_bold.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-04_bold.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-05_bold.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-06_bold.nii.gz']
>>> bids_root['sbref'] # doctest: +ELLIPSIS
['.../ds054/sub-100185/func/sub-100185_task-machinegame_run-01_sbref.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-02_sbref.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-03_sbref.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-04_sbref.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-05_sbref.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-06_sbref.nii.gz']
>>> bids_root['t1w'] # doctest: +ELLIPSIS
['.../ds054/sub-100185/anat/sub-100185_T1w.nii.gz']
>>> bids_root['t2w'] # doctest: +ELLIPSIS
[]
>>> bids_root, _ = collect_data(str(datadir / 'ds051'), '01',
... bids_validate=False,
... bids_filters={'t1w':{'run': 1, 'session': None}})
>>> bids_root['t1w'] # doctest: +ELLIPSIS
['.../ds051/sub-01/anat/sub-01_run-01_T1w.nii.gz']
"""
if isinstance(bids_dir, BIDSLayout):
layout = bids_dir
else:
layout = BIDSLayout(str(bids_dir), validate=bids_validate)
layout_get_kwargs = {
'return_type': 'file',
'subject': participant_label,
'extension': ['.nii', '.nii.gz'],
'session': session_id or Query.OPTIONAL,
}
queries = queries or DEFAULT_BIDS_QUERIES
bids_filters = bids_filters or {}
for acq, entities in bids_filters.items():
queries[acq].update(entities)
for entity in list(layout_get_kwargs.keys()):
if entity in entities:
# avoid clobbering layout.get
del layout_get_kwargs[entity]
if task:
queries["bold"]["task"] = task
if echo:
queries["bold"]["echo"] = echo
subj_data = {
dtype: sorted(layout.get(**layout_get_kwargs, **query))
for dtype, query in queries.items()
}
# Special case: multi-echo BOLD, grouping echos
if (
group_echos
and "bold" in subj_data
and any(["_echo-" in bold for bold in subj_data["bold"]])
):
subj_data["bold"] = group_multiecho(subj_data["bold"])
return subj_data, layout
def _init_layout(in_file=None, bids_dir=None, validate=True, database_path=None):
if isinstance(bids_dir, BIDSLayout):
return bids_dir
if bids_dir is None:
in_file = Path(in_file)
for parent in in_file.parents:
if parent.name.startswith("sub-"):
bids_dir = parent.parent.resolve()
break
if bids_dir is None:
raise RuntimeError("Could not infer BIDS root")
layout = BIDSLayout(
str(bids_dir),
validate=validate,
database_path=database_path,
)
return layout
[docs]
def group_multiecho(bold_sess):
"""
Multiplex multi-echo EPIs into arrays.
Dual-echo is a special case of multi-echo, which is treated as single-echo data.
Examples
--------
>>> bold_sess = ["sub-01_task-rest_echo-1_run-01_bold.nii.gz",
... "sub-01_task-rest_echo-2_run-01_bold.nii.gz",
... "sub-01_task-rest_echo-1_run-02_bold.nii.gz",
... "sub-01_task-rest_echo-2_run-02_bold.nii.gz",
... "sub-01_task-rest_echo-3_run-02_bold.nii.gz",
... "sub-01_task-rest_run-03_bold.nii.gz"]
>>> group_multiecho(bold_sess) # doctest: +NORMALIZE_WHITESPACE
['sub-01_task-rest_echo-1_run-01_bold.nii.gz',
'sub-01_task-rest_echo-2_run-01_bold.nii.gz',
['sub-01_task-rest_echo-1_run-02_bold.nii.gz',
'sub-01_task-rest_echo-2_run-02_bold.nii.gz',
'sub-01_task-rest_echo-3_run-02_bold.nii.gz'],
'sub-01_task-rest_run-03_bold.nii.gz']
>>> bold_sess.insert(2, "sub-01_task-rest_echo-3_run-01_bold.nii.gz")
>>> group_multiecho(bold_sess) # doctest: +NORMALIZE_WHITESPACE
[['sub-01_task-rest_echo-1_run-01_bold.nii.gz',
'sub-01_task-rest_echo-2_run-01_bold.nii.gz',
'sub-01_task-rest_echo-3_run-01_bold.nii.gz'],
['sub-01_task-rest_echo-1_run-02_bold.nii.gz',
'sub-01_task-rest_echo-2_run-02_bold.nii.gz',
'sub-01_task-rest_echo-3_run-02_bold.nii.gz'],
'sub-01_task-rest_run-03_bold.nii.gz']
>>> bold_sess += ["sub-01_task-beh_echo-1_run-01_bold.nii.gz",
... "sub-01_task-beh_echo-2_run-01_bold.nii.gz",
... "sub-01_task-beh_echo-1_run-02_bold.nii.gz",
... "sub-01_task-beh_echo-2_run-02_bold.nii.gz",
... "sub-01_task-beh_echo-3_run-02_bold.nii.gz",
... "sub-01_task-beh_run-03_bold.nii.gz"]
>>> group_multiecho(bold_sess) # doctest: +NORMALIZE_WHITESPACE
[['sub-01_task-rest_echo-1_run-01_bold.nii.gz',
'sub-01_task-rest_echo-2_run-01_bold.nii.gz',
'sub-01_task-rest_echo-3_run-01_bold.nii.gz'],
['sub-01_task-rest_echo-1_run-02_bold.nii.gz',
'sub-01_task-rest_echo-2_run-02_bold.nii.gz',
'sub-01_task-rest_echo-3_run-02_bold.nii.gz'],
'sub-01_task-rest_run-03_bold.nii.gz',
'sub-01_task-beh_echo-1_run-01_bold.nii.gz',
'sub-01_task-beh_echo-2_run-01_bold.nii.gz',
['sub-01_task-beh_echo-1_run-02_bold.nii.gz',
'sub-01_task-beh_echo-2_run-02_bold.nii.gz',
'sub-01_task-beh_echo-3_run-02_bold.nii.gz'],
'sub-01_task-beh_run-03_bold.nii.gz']
Some tests from https://neurostars.org/t/fmriprep-from-singularity-unboundlocalerror/3299/7
>>> bold_sess = ['sub-01_task-AudLoc_echo-1_bold.nii',
... 'sub-01_task-AudLoc_echo-2_bold.nii',
... 'sub-01_task-FJT_echo-1_bold.nii',
... 'sub-01_task-FJT_echo-2_bold.nii',
... 'sub-01_task-LDT_echo-1_bold.nii',
... 'sub-01_task-LDT_echo-2_bold.nii',
... 'sub-01_task-MotLoc_echo-1_bold.nii',
... 'sub-01_task-MotLoc_echo-2_bold.nii']
>>> group_multiecho(bold_sess) == bold_sess
True
>>> bold_sess += ['sub-01_task-MotLoc_echo-3_bold.nii']
>>> groups = group_multiecho(bold_sess)
>>> len(groups[:-1])
6
>>> [isinstance(g, list) for g in groups]
[False, False, False, False, False, False, True]
>>> len(groups[-1])
3
"""
from itertools import groupby
def _grp_echos(x):
if "_echo-" not in x:
return x
echo = re.search("_echo-\\d*", x).group(0)
return x.replace(echo, "_echo-?")
ses_uids = []
for _, bold in groupby(bold_sess, key=_grp_echos):
bold = list(bold)
# If single- or dual-echo, flatten list; keep list otherwise.
action = getattr(ses_uids, "append" if len(bold) > 2 else "extend")
action(bold)
return ses_uids
[docs]
def relative_to_root(path):
"""
Calculate the BIDS root folder given one file path's.
Examples
--------
>>> str(relative_to_root(
... "/sub-03/sourcedata/sub-01/anat/sub-01_T1.nii.gz"
... ))
'sub-01/anat/sub-01_T1.nii.gz'
>>> str(relative_to_root(
... "/sub-03/anat/sourcedata/sub-01/ses-preop/anat/sub-01_ses-preop_T1.nii.gz"
... ))
'sub-01/ses-preop/anat/sub-01_ses-preop_T1.nii.gz'
>>> str(relative_to_root(
... "sub-01/anat/sub-01_T1.nii.gz"
... ))
'sub-01/anat/sub-01_T1.nii.gz'
>>> str(relative_to_root("anat/sub-01_T1.nii.gz"))
'anat/sub-01_T1.nii.gz'
"""
path = Path(path)
if path.name.startswith("sub-"):
parents = [path.name]
for p in path.parents:
parents.insert(0, p.name)
if p.name.startswith("sub-"):
return Path(*parents)
return path
raise ValueError(
f"Could not determine the BIDS root of <{path}>. "
"Only files under a subject directory are currently supported."
)
[docs]
def check_pipeline_version(cvers, data_desc):
"""
Search for existing BIDS pipeline output and compares against current pipeline version.
.. testsetup::
>>> import json
>>> data = {"PipelineDescription": {"Version": "1.1.1rc5"}}
>>> desc_file = Path('sample_dataset_description.json')
>>> _ = desc_file.write_text(json.dumps(data))
Parameters
----------
cvers : :obj:`str`
Current pipeline version
data_desc : :obj:`str` or :obj:`os.PathLike`
Path to pipeline output's ``dataset_description.json``
Examples
--------
>>> check_pipeline_version('1.1.1rc5', 'sample_dataset_description.json') is None
True
>>> check_pipeline_version('1.1.1rc5+129.gbe0e5158', 'sample_dataset_description.json')
>>> check_pipeline_version('1.2', 'sample_dataset_description.json') # doctest: +ELLIPSIS
'Previous output generated ...'
Returns
-------
message : :obj:`str` or :obj:`None`
A warning string if there is a difference between versions, otherwise ``None``.
"""
data_desc = Path(data_desc)
if not data_desc.exists():
return
desc = json.loads(data_desc.read_text())
dvers = desc.get("PipelineDescription", {}).get("Version", "0+unknown")
if Version(cvers).public != Version(dvers).public:
return "Previous output generated by version {} found.".format(dvers)