# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2022 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
# https://www.nipreps.org/community/licensing/
#
"""PET data representation."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Union
import attr
import h5py
import nibabel as nb
import numpy as np
from nifreeze.data.base import BaseDataset, _cmp, _data_repr
[docs]
@attr.s(slots=True)
class PET(BaseDataset):
"""Data representation structure for PET data."""
frame_time: np.ndarray | None = attr.ib(
default=None, repr=_data_repr, eq=attr.cmp_using(eq=_cmp)
)
"""A (N,) numpy array specifying the midpoint timing of each sample or frame."""
total_duration: float | None = attr.ib(default=None, repr=True)
"""A float representing the total duration of the dataset."""
def __getitem__(
self, idx: int | slice | tuple | np.ndarray
) -> tuple[np.ndarray, np.ndarray | None, np.ndarray | None]:
"""
Returns volume(s) and corresponding affine(s) and timing(s) through fancy indexing.
Parameters
----------
idx : :obj:`int` or :obj:`slice` or :obj:`tuple` or :obj:`~numpy.ndarray`
Indexer for the last dimension (or possibly other dimensions if extended).
Returns
-------
volumes : :obj:`~numpy.ndarray`
The selected data subset.
If ``idx`` is a single integer, this will have shape ``(X, Y, Z)``,
otherwise it may have shape ``(X, Y, Z, k)``.
motion_affine : :obj:`~numpy.ndarray` or ``None``
The corresponding per-volume motion affine(s) or ``None`` if identity transform(s).
time : :obj:`float` or ``None``
The frame time corresponding to the index(es).
"""
data, affine = super().__getitem__(idx)
return data, affine, self.frame_time[idx]
[docs]
@classmethod
def from_filename(cls, filename: Union[str, Path]) -> PET:
"""
Read an HDF5 file from disk and create a PET object.
Parameters
----------
filename : :obj:`os.pathlike`
The HDF5 file path to read.
Returns
-------
:obj:`~nifreeze.data.pet.PET`
A PET dataset with data loaded from the specified file.
"""
import attr
filename = Path(filename)
data: dict[str, Any] = {}
with h5py.File(filename, "r") as in_file:
root = in_file["/0"]
for f in attr.fields(cls):
# skip private attributes (start with '_')
if f.name.startswith("_"):
continue
if f.name in root:
data[f.name] = np.asanyarray(root[f.name])
else:
data[f.name] = None
return cls(**data)
[docs]
def to_filename(
self,
filename: Path | str,
compression: str | None = None,
compression_opts: Any = None,
) -> None:
"""
Write the PET dataset to an HDF5 file on disk.
Parameters
----------
filename : :obj:`os.pathlike`
The HDF5 file path to write to.
compression : :obj:`str`, optional
Compression strategy.
See :obj:`~h5py.Group.create_dataset` documentation.
compression_opts : :obj:`typing.Any`, optional
Parameters for compression
`filters <https://docs.h5py.org/en/stable/high/dataset.html#dataset-compression>`__.
"""
super().to_filename(filename, compression=compression, compression_opts=compression_opts)
# Overriding if you'd like to set a custom attribute, for example:
with h5py.File(filename, "r+") as out_file:
out_file.attrs["Type"] = "pet"
[docs]
def load(
filename: Path | str,
brainmask_file: Path | str | None = None,
motion_file: Path | str | None = None,
frame_time: np.ndarray | list[float] | None = None,
frame_duration: np.ndarray | list[float] | None = None,
) -> PET:
"""
Load PET data from HDF5 or NIfTI, creating a PET object with appropriate metadata.
Parameters
----------
filename : :obj:`os.pathlike`
The NIfTI or HDF5 file.
brainmask_file : :obj:`os.pathlike`, optional
A brainmask NIfTI file. If provided, will be loaded and
stored in the returned dataset.
motion_file : :obj:`os.pathlike`
A file containing head-motion affine matrices (linear).
frame_time : :obj:`numpy.ndarray` or :obj:`list` of :obj:`float`, optional
The start times of each frame relative to the beginning of the acquisition.
If ``None``, an error is raised (since BIDS requires ``FrameTimesStart``).
frame_duration : :obj:`numpy.ndarray` or :obj:`list` of :obj:`float`, optional
The duration of each frame.
If ``None``, it is derived by the difference of consecutive frame times,
defaulting the last frame to match the second-last.
Returns
-------
:obj:`~nifreeze.data.pet.PET`
A PET object storing the data, metadata, and any optional mask.
Raises
------
RuntimeError
If ``frame_time`` is not provided (BIDS requires it).
"""
if motion_file:
raise NotImplementedError
filename = Path(filename)
if filename.suffix == ".h5":
# Load from HDF5
pet_obj = PET.from_filename(filename)
else:
# Load from NIfTI
img = nb.load(str(filename))
data = img.get_fdata(dtype=np.float32)
pet_obj = PET(
dataobj=data,
affine=img.affine,
)
# Verify the user provided frame_time if not already in the PET object
if pet_obj.frame_time is None and frame_time is None:
raise RuntimeError(
"The `frame_time` is mandatory for PET data to comply with BIDS. "
"See https://bids-specification.readthedocs.io for details."
)
# If the user supplied new values, set them
if frame_time is not None:
# Convert to a float32 numpy array and zero out the earliest time
frame_time_arr = np.array(frame_time, dtype=np.float32)
frame_time_arr -= frame_time_arr[0]
pet_obj.frame_time = frame_time_arr
# If the user doesn't provide frame_duration, we derive it:
if frame_duration is None:
frame_time_arr = pet_obj.frame_time
# If shape is e.g. (N,), then we can do
durations = np.diff(frame_time_arr)
if len(durations) == (len(frame_time_arr) - 1):
durations = np.append(durations, durations[-1]) # last frame same as second-last
else:
durations = np.array(frame_duration, dtype=np.float32)
# Set total_duration and shift frame_time to the midpoint
pet_obj.total_duration = float(frame_time_arr[-1] + durations[-1])
pet_obj.frame_time = frame_time_arr + 0.5 * durations
# If a brain mask is provided, load and attach
if brainmask_file is not None:
mask_img = nb.load(str(brainmask_file))
pet_obj.brainmask = np.asanyarray(mask_img.dataobj, dtype=bool)
return pet_obj