Source code for eddymotion.model.gpr

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
#     https://www.nipreps.org/community/licensing/
#
"""Derivations from scikit-learn for Gaussian Processes."""

from __future__ import annotations

from numbers import Integral, Real
from typing import Callable, Mapping, Sequence

import numpy as np
from scipy import optimize
from scipy.optimize._minimize import Bounds
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import (
    Hyperparameter,
    Kernel,
)
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.utils._param_validation import Interval, StrOptions

BOUNDS_A: tuple[float, float] = (0.1, 2.35)
"""The limits for the parameter *a* (angular distance in rad)."""
BOUNDS_LAMBDA: tuple[float, float] = (1e-3, 1000)
"""The limits for the parameter λ (signal scaling factor)."""
THETA_EPSILON: float = 1e-5
"""Minimum nonzero angle."""
LBFGS_CONFIGURABLE_OPTIONS = {"disp", "maxiter", "ftol", "gtol"}
"""The set of extended options that can be set on the default BFGS."""
CONFIGURABLE_OPTIONS: Mapping[str, set] = {
    "Nelder-Mead": {"disp", "maxiter", "adaptive", "fatol"},
    "CG": {"disp", "maxiter", "gtol"},
}
"""
A mapping from optimizer names to the option set they allow.

Add new optimizers to this list, including what options may be
configured.
"""
NONGRADIENT_METHODS = {"Nelder-Mead"}
"""A set of gradients that do not allow analytical gradients."""
SUPPORTED_OPTIMIZERS = set(CONFIGURABLE_OPTIONS.keys()) | {"fmin_l_bfgs_b"}
"""A set of supported optimizers (automatically created)."""


[docs] class EddyMotionGPR(GaussianProcessRegressor): r""" A GP regressor specialized for eddymotion. This specialization of the default GP regressor is created to allow the following extended behaviors: * Pacify Scikit-learn's estimator parameter checker to allow optimizers given by name (as a string) other than the default BFGS. * Enable custom options of optimizers. See :obj:`~scipy.optimize.minimize` for the available options. Please note that only a few of them are currently supported. In the future, this specialization would be the right place for hyperparameter optimization using cross-validation and such. In principle, Scikit-Learn's implementation normalizes the training data as in [Andersson15]_ (see `FSL's souce code <https://git.fmrib.ox.ac.uk/fsl/eddy/-/blob/2480dda293d4cec83014454db3a193b87921f6b0/DiffusionGP.cpp#L218>`__). From their paper (p. 167, end of first column): Typically one just substracts the mean (:math:`\bar{\mathbf{f}}`) from :math:`\mathbf{f}` and then add it back to :math:`f^{*}`, which is analogous to what is often done in "traditional" regression. Finally, the parameter :math:`\sigma^2` maps on to Scikit-learn's ``alpha`` of the regressor. Because it is not a parameter of the kernel, hyperparameter selection through gradient-descent with analytical gradient calculations would not work (the derivative of the kernel w.r.t. alpha is zero). I believe this is overlooked in [Andersson15]_, or they actually did not use analytical gradient-descent: *A note on optimisation* It is suggested, for example in Rasmussen and Williams (2006), that an optimisation method that uses derivative information should be used when finding the hyperparameters that maximise Eq. (12). The reason for that is that such methods typically use fewer steps, and when the cost of calculating the derivatives is small/moderate compared to calculating the functions itself (as is the case for Eq. (12)) then execution time can be much shorter. However, we found that for the multi-shell case a heuristic optimisation method such as the Nelder-Mead simplex method (Nelder and Mead, 1965) was frequently better at avoiding local maxima. Hence, that was the method we used for all optimisations in the present paper. **Multi-shell regression (TODO).** For multi-shell modeling, the kernel :math:`k(\textbf{x}, \textbf{x'})` is updated following Eq. (14) in [Andersson15]_. .. math:: k(\textbf{x}, \textbf{x'}) = C_{\theta}(\mathbf{g}, \mathbf{g'}; a) C_{b}(|b - b'|; \ell) and :math:`C_{b}` is based the log of the b-values ratio, a measure of distance along the b-direction, according to Eq. (15) given by: .. math:: C_{b}(b, b'; \ell) = \exp\left( - \frac{(\log b - \log b')^2}{2 \ell^2} \right), :math:`b` and :math:`b'` being the b-values, and :math:`\mathbf{g}` and :math:`\mathbf{g'}` the unit diffusion-encoding gradient unit vectors of the shells; and :math:`{a, \ell}` some hyperparameters. The full GP regression kernel :math:`\mathbf{K}` is then updated for a 2-shell case as follows (Eq. (16) in [Andersson15]_): .. math:: \begin{equation} \mathbf{K} = \left[ \begin{matrix} \lambda C_{\theta}(\theta (\mathbf{G}_{1}); a) + \sigma_{1}^{2} \mathbf{I} & \lambda C_{\theta}(\theta (\mathbf{G}_{2}, \mathbf{G}_{1}); a) C_{b}(b_{2}, b_{1}; \ell) \\ \lambda C_{\theta}(\theta (\mathbf{G}_{1}, \mathbf{G}_{2}); a) C_{b}(b_{1}, b_{2}; \ell) & \lambda C_{\theta}(\theta (\mathbf{G}_{2}); a) + \sigma_{2}^{2} \mathbf{I} \\ \end{matrix} \right] \end{equation} References ---------- .. [Andersson15] J. L. R. Andersson. et al., An integrated approach to correction for off-resonance effects and subject movement in diffusion MR imaging, NeuroImage 125 (2016) 1063-11078 """ _parameter_constraints: dict = { "kernel": [None, Kernel], "alpha": [Interval(Real, 0, None, closed="left"), np.ndarray], "optimizer": [StrOptions(SUPPORTED_OPTIMIZERS), callable, None], "n_restarts_optimizer": [Interval(Integral, 0, None, closed="left")], "copy_X_train": ["boolean"], "normalize_y": ["boolean"], "n_targets": [Interval(Integral, 1, None, closed="left"), None], "random_state": ["random_state"], } def __init__( self, kernel: Kernel | None = None, *, alpha: float = 0.5, optimizer: str | Callable | None = "fmin_l_bfgs_b", n_restarts_optimizer: int = 0, copy_X_train: bool = True, normalize_y: bool = True, n_targets: int | None = None, random_state: int | None = None, eval_gradient: bool = True, tol: float | None = None, disp: bool | int | None = None, maxiter: int | None = None, ftol: float | None = None, gtol: float | None = None, adaptive: bool | int | None = None, fatol: float | None = None, ): super().__init__( kernel, alpha=alpha, optimizer=optimizer, n_restarts_optimizer=n_restarts_optimizer, normalize_y=normalize_y, copy_X_train=copy_X_train, n_targets=n_targets, random_state=random_state, ) self.tol = tol self.eval_gradient = eval_gradient if optimizer not in NONGRADIENT_METHODS else False self.maxiter = maxiter self.disp = disp self.ftol = ftol self.gtol = gtol self.adaptive = adaptive self.fatol = fatol def _constrained_optimization( self, obj_func: Callable, initial_theta: np.ndarray, bounds: Sequence[tuple[float, float]] | Bounds, ) -> tuple[float, float]: options = {} if self.optimizer == "fmin_l_bfgs_b": from sklearn.utils.optimize import _check_optimize_result for name in LBFGS_CONFIGURABLE_OPTIONS: if (value := getattr(self, name, None)) is not None: options[name] = value opt_res = optimize.minimize( obj_func, initial_theta, method="L-BFGS-B", bounds=bounds, jac=self.eval_gradient, options=options, args=(self.eval_gradient,), tol=self.tol, ) _check_optimize_result("lbfgs", opt_res) return opt_res.x, opt_res.fun if isinstance(self.optimizer, str) and self.optimizer in CONFIGURABLE_OPTIONS: for name in CONFIGURABLE_OPTIONS[self.optimizer]: if (value := getattr(self, name, None)) is not None: options[name] = value opt_res = optimize.minimize( obj_func, initial_theta, method=self.optimizer, bounds=bounds, jac=self.eval_gradient, options=options, args=(self.eval_gradient,), tol=self.tol, ) return opt_res.x, opt_res.fun if callable(self.optimizer): return self.optimizer(obj_func, initial_theta, bounds=bounds) raise ValueError(f"Unknown optimizer {self.optimizer}.")
[docs] class ExponentialKriging(Kernel): """A scikit-learn's kernel for DWI signals.""" def __init__( self, beta_a: float = 0.01, beta_l: float = 2.0, a_bounds: tuple[float, float] = BOUNDS_A, l_bounds: tuple[float, float] = BOUNDS_LAMBDA, ): r""" Initialize an exponential Kriging kernel. Parameters ---------- beta_a : :obj:`float`, optional Minimum angle in rads. beta_l : :obj:`float`, optional The :math:`\lambda` hyperparameter. a_bounds : :obj:`tuple`, optional Bounds for the a parameter. l_bounds : :obj:`tuple`, optional Bounds for the :math:`\lambda` hyperparameter. """ self.beta_a = beta_a self.beta_l = beta_l self.a_bounds = a_bounds self.l_bounds = l_bounds @property def hyperparameter_a(self) -> Hyperparameter: return Hyperparameter("beta_a", "numeric", self.a_bounds) @property def hyperparameter_beta_l(self) -> Hyperparameter: return Hyperparameter("beta_l", "numeric", self.l_bounds) def __call__( self, X: np.ndarray, Y: np.ndarray | None = None, eval_gradient: bool = False ) -> np.ndarray | tuple[np.ndarray, np.ndarray]: """ Return the kernel K(X, Y) and optionally its gradient. Parameters ---------- X : :obj:`~numpy.ndarray` Gradient table (X) Y : :obj:`~numpy.ndarray`, optional Gradient table (Y, optional) eval_gradient : :obj:`bool`, optional Determines whether the gradient with respect to the log of the kernel hyperparameter is computed. Only supported when Y is ``None``. Returns ------- K : ndarray of shape (n_samples_X, n_samples_Y) Kernel k(X, Y) K_gradient : ndarray of shape (n_samples_X, n_samples_X, n_dims),\ optional The gradient of the kernel k(X, X) with respect to the log of the hyperparameter of the kernel. Only returned when `eval_gradient` is True. """ thetas = compute_pairwise_angles(X, Y) C_theta = exponential_covariance(thetas, self.beta_a) if not eval_gradient: return self.beta_l * C_theta K_gradient = np.zeros((*thetas.shape, 2)) K_gradient[..., 0] = self.beta_l * C_theta * thetas / self.beta_a**2 # Derivative w.r.t. a K_gradient[..., 1] = C_theta return self.beta_l * C_theta, K_gradient
[docs] def diag(self, X: np.ndarray) -> np.ndarray: """Returns the diagonal of the kernel k(X, X). The result of this method is identical to np.diag(self(X)); however, it can be evaluated more efficiently since only the diagonal is evaluated. Parameters ---------- X : ndarray of shape (n_samples_X, n_features) Left argument of the returned kernel k(X, Y) Returns ------- K_diag : ndarray of shape (n_samples_X,) Diagonal of kernel k(X, X) """ return self.beta_l * np.ones(X.shape[0])
[docs] def is_stationary(self) -> bool: """Returns whether the kernel is stationary.""" return True
def __repr__(self) -> str: return f"ExponentialKriging (a={self.beta_a}, λ={self.beta_l})"
[docs] class SphericalKriging(Kernel): """A scikit-learn's kernel for DWI signals.""" def __init__( self, beta_a: float = 1.38, beta_l: float = 0.5, a_bounds: tuple[float, float] = BOUNDS_A, l_bounds: tuple[float, float] = BOUNDS_LAMBDA, ): r""" Initialize a spherical Kriging kernel. Parameters ---------- beta_a : :obj:`float`, optional Minimum angle in rads. beta_l : :obj:`float`, optional The :math:`\lambda` hyperparameter. a_bounds : :obj:`tuple`, optional Bounds for the ``a`` parameter. l_bounds : :obj:`tuple`, optional Bounds for the :math:`\lambda` hyperparameter. """ self.beta_a = beta_a self.beta_l = beta_l self.a_bounds = a_bounds self.l_bounds = l_bounds @property def hyperparameter_a(self) -> Hyperparameter: return Hyperparameter("beta_a", "numeric", self.a_bounds) @property def hyperparameter_beta_l(self) -> Hyperparameter: return Hyperparameter("beta_l", "numeric", self.l_bounds) def __call__( self, X: np.ndarray, Y: np.ndarray | None = None, eval_gradient: bool = False ) -> np.ndarray | tuple[np.ndarray, np.ndarray]: """ Return the kernel K(X, Y) and optionally its gradient. Parameters ---------- X : :obj:`~numpy.ndarray` Gradient table (X) Y : :obj:`~numpy.ndarray`, optional Gradient table (Y, optional) eval_gradient : :obj:`bool`, optional Determines whether the gradient with respect to the log of the kernel hyperparameter is computed. Only supported when Y is ``None``. Returns ------- K : ndarray of shape (n_samples_X, n_samples_Y) Kernel k(X, Y) K_gradient : ndarray of shape (n_samples_X, n_samples_X, n_dims),\ optional The gradient of the kernel k(X, X) with respect to the log of the hyperparameter of the kernel. Only returned when ``eval_gradient`` is True. """ thetas = compute_pairwise_angles(X, Y) C_theta = spherical_covariance(thetas, self.beta_a) if not eval_gradient: return self.beta_l * C_theta deriv_a = np.zeros_like(thetas) nonzero = thetas <= self.beta_a deriv_a[nonzero] = ( 1.5 * self.beta_l * (thetas[nonzero] / self.beta_a**2 - thetas[nonzero] ** 3 / self.beta_a**4) ) K_gradient = np.dstack((deriv_a, C_theta)) return self.beta_l * C_theta, K_gradient
[docs] def diag(self, X: np.ndarray) -> np.ndarray: """Returns the diagonal of the kernel k(X, X). The result of this method is identical to np.diag(self(X)); however, it can be evaluated more efficiently since only the diagonal is evaluated. Parameters ---------- X : ndarray of shape (n_samples_X, n_features) Left argument of the returned kernel k(X, Y) Returns ------- K_diag : ndarray of shape (n_samples_X,) Diagonal of kernel k(X, X) """ return self.beta_l * np.ones(X.shape[0])
[docs] def is_stationary(self) -> bool: """Returns whether the kernel is stationary.""" return True
def __repr__(self) -> str: return f"SphericalKriging (a={self.beta_a}, λ={self.beta_l})"
[docs] def exponential_covariance(theta: np.ndarray, a: float) -> np.ndarray: r""" Compute the exponential covariance for given distances and scale parameter. Implements :math:`C_{\theta}`, following Eq. (9) in [Andersson15]_: .. math:: \begin{equation} C(\theta) = e^{-\theta/a} \,\, \text{for} \, 0 \leq \theta \leq \pi, \end{equation} :math:`\theta` being computed as: .. math:: \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) Parameters ---------- theta : :obj:`~numpy.ndarray` Array of distances between points. a : :obj:`float` Scale parameter that controls the range of the covariance function. Returns ------- :obj:`~numpy.ndarray` Exponential covariance values for the input distances. """ return np.exp(-theta / a)
[docs] def spherical_covariance(theta: np.ndarray, a: float) -> np.ndarray: r""" Compute the spherical covariance for given distances and scale parameter. Implements :math:`C_{\theta}`, following Eq. (10) in [Andersson15]_: .. math:: \begin{equation} C(\theta) = \begin{cases} 1 - \frac{3 \theta}{2 a} + \frac{\theta^3}{2 a^3} & \textnormal{if} \; \theta \leq a \\ 0 & \textnormal{if} \; \theta > a \end{cases} \end{equation} :math:`\theta` being computed as: .. math:: \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) Parameters ---------- theta : :obj:`~numpy.ndarray` Array of distances between points. a : :obj:`float` Scale parameter that controls the range of the covariance function. Returns ------- :obj:`~numpy.ndarray` Spherical covariance values for the input distances. """ return np.where(theta <= a, 1 - 1.5 * theta / a + 0.5 * (theta**3) / (a**3), 0.0)
[docs] def compute_pairwise_angles( X: np.ndarray, Y: np.ndarray | None = None, closest_polarity: bool = True, dense_output: bool = True, ) -> np.ndarray: r"""Compute pairwise angles across diffusion gradient encoding directions. Following [Andersson15]_, it computes the smallest of the angles between each pair if ``closest_polarity`` is ``True``, i.e., .. math:: \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples_X, n_features) Input data. Y : {array-like, sparse matrix} of shape (n_samples_Y, n_features), optional Input data. If ``None``, the output will be the pairwise similarities between all samples in ``X``. dense_output : :obj:`bool`, optional Whether to return dense output even when the input is sparse. If ``False``, the output is sparse if both input arrays are sparse. closest_polarity : :obj:`bool`, optional ``True`` to consider the smallest of the two angles between the crossing lines resulting from reversing each vector pair. Returns ------- :obj:`~numpy.ndarray` Pairwise angles across diffusion gradient encoding directions. Examples -------- >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T >>> compute_pairwise_angles(X, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS 3.1415... >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T >>> Y = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T >>> compute_pairwise_angles(X, Y, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS 3.1415... >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T >>> compute_pairwise_angles(X)[0, 1] 0.0 """ cosines = np.clip(cosine_similarity(X, Y, dense_output=dense_output), -1.0, 1.0) thetas = np.arccos(np.abs(cosines)) if closest_polarity else np.arccos(cosines) thetas[np.abs(thetas) < THETA_EPSILON] = 0.0 return thetas