Source code for astrophot.fit.lm

# Levenberg-Marquardt algorithm
from typing import Sequence, Optional

import numpy as np

from .base import BaseOptimizer
from .. import config
from ..backend_obj import backend, ArrayLike
from . import func
from ..errors import OptimizeStopFail, OptimizeStopSuccess
from ..param import ValidContext, Module, forward

__all__ = ("LM", "LMConstraint")


[docs] class LMConstraint(Module): def __init__(self, model, constraint, sigma, name=None): super().__init__(name=name) self.model = model self.constraint = constraint self.weight = 1 / sigma**2
[docs] def jacobian(self, model, params): return backend.jacobian(lambda p: self(model, params=p), params)
@forward def __call__(self, model): return self.constraint(model)
class DummyConstraint: valid_context = False
[docs] class LM(BaseOptimizer): """The LM class is an implementation of the Levenberg-Marquardt optimization algorithm. This method is used to solve non-linear least squares problems and is known for its robustness and efficiency. The Levenberg-Marquardt (LM) algorithm is an iterative method used for solving non-linear least squares problems. It can be seen as a combination of the Gauss-Newton method and the gradient descent method: it works like the Gauss-Newton method when the parameters are approximately near a minimum, and like the gradient descent method when the parameters are far from their optimal values. The cost function that the LM algorithm tries to minimize is of the form: $$f(\\boldsymbol{\\beta}) = \\frac{1}{2}\\sum_{i=1}^{N} r_i(\\boldsymbol{\\beta})^2$$ where $\\boldsymbol{\\beta}$ is the vector of parameters, $r_i$ are the residuals, and $N$ is the number of observations. The LM algorithm iteratively performs the following update to the parameters: $$\\boldsymbol{\\beta}_{n+1} = \\boldsymbol{\\beta}_{n} - (J^T J + \\lambda diag(J^T J))^{-1} J^T \\boldsymbol{r}$$ where: - $J$ is the Jacobian matrix whose elements are $J_{ij} = \\frac{\\partial r_i}{\\partial \\beta_j}$, - $\\boldsymbol{r}$ is the vector of residuals $r_i(\\boldsymbol{\\beta})$, - $\\lambda$ is a damping factor which is adjusted at each iteration. When $\\lambda = 0$ this can be seen as the Gauss-Newton method. In the limit that $\\lambda$ is large, the $J^T J$ matrix (an approximation of the Hessian) becomes subdominant and the update essentially points along $J^T \\boldsymbol{r}$ which is the gradient. In this scenario the gradient descent direction is also modified by the $\\lambda diag(J^T J)$ scaling which in some sense makes each gradient unitless and further improves the step. Note as well that as $\\lambda$ gets larger the step taken will be smaller, which helps to ensure convergence when the initial guess of the parameters are far from the optimal solution. Note that the residuals $r_i$ are typically also scaled by the variance of the pixels, but this does not change the equations above. For a detailed explanation of the LM method see the article by Henri Gavin on which much of the AstroPhot LM implementation is based:: .. code-block:: text @article{Gavin2019, title={The Levenberg-Marquardt algorithm for nonlinear least squares curve-fitting problems}, author={Gavin, Henri P}, journal={Department of Civil and Environmental Engineering, Duke University}, volume={19}, year={2019} } as well as the paper on LM geodesic acceleration by Mark Transtrum:: .. code-block:: text @article{Tanstrum2012, author = {{Transtrum}, Mark K. and {Sethna}, James P.}, title = "{Improvements to the Levenberg-Marquardt algorithm for nonlinear least-squares minimization}", year = 2012, doi = {10.48550/arXiv.1201.5885}, adsurl = {https://ui.adsabs.harvard.edu/abs/2012arXiv1201.5885T}, } The damping factor $\\lambda$ is adjusted at each iteration: it is effectively increased when we are far from the solution, and decreased when we are close to it. In practice, the algorithm attempts to pick the smallest $\\lambda$ that is can while making sure that the $\\chi^2$ decreases at each step. The main advantage of the LM algorithm is its adaptability. When the current estimate is far from the optimum, the algorithm behaves like a gradient descent to ensure global convergence. However, when it is close to the optimum, it behaves like Gauss-Newton, which provides fast local convergence. In practice, the algorithm is typically implemented with various enhancements to improve its performance. For example, the Jacobian may be approximated with finite differences, geodesic acceleration can be used to speed up convergence, and more sophisticated strategies can be used to adjust the damping factor $\\lambda$. The exact performance of the LM algorithm will depend on the nature of the problem, including the complexity of the function f(x), the quality of the initial estimate x0, and the distribution of the data. The LM class implemented for AstroPhot takes a model, initial state, and various other optional parameters as inputs and seeks to find the parameters that minimize the cost function. """ def __init__( self, model, initial_state: Sequence = None, max_iter: int = 100, relative_tolerance: float = 1e-5, Lup=11.0, Ldn=9.0, L0=1.0, max_step_iter: int = 10, ndf=None, likelihood="gaussian", constraint: Optional[LMConstraint] = None, forward=None, jacobian=None, **kwargs, ): super().__init__( model, initial_state, max_iter=max_iter, relative_tolerance=relative_tolerance, **kwargs, ) # Maximum number of steps while searching for chi^2 improvement on a single jacobian evaluation self.max_step_iter = max_step_iter self.Lup = Lup self.Ldn = Ldn self.L = L0 self.likelihood = likelihood if self.likelihood not in ["gaussian", "poisson"]: raise ValueError( f"Unsupported likelihood: {self.likelihood}, should be one of: 'gaussian' or 'poisson'" ) self.constraint = constraint # mask mask = self.model.target[self.model.window].flatten("mask") self.mask = ~mask if backend.sum(self.mask).item() == 0: raise OptimizeStopSuccess("No data to fit. All pixels are masked") # Initialize optimizer attributes self.Y = self.model.target[self.model.window].flatten("data")[self.mask] # 1 / (sigma^2) kW = kwargs.get("W", None) if kW is not None: self.W = backend.as_array(kW, dtype=config.DTYPE, device=config.DEVICE).flatten()[ self.mask ] else: self.W = self.model.target[self.model.window].flatten("weight")[self.mask] if forward is None: forward = lambda x: self.model(params=x).flatten("data") if jacobian is None: jacobian = lambda x: self.model.jacobian(params=x).flatten("data") if self.constraint is None: # The forward model which computes the output image given input parameters self.forward = lambda x: forward(x)[self.mask] # Compute the jacobian self.jacobian = lambda x: jacobian(x)[self.mask] self.constraint = DummyConstraint() else: self.Y = backend.concatenate( ( self.Y, backend.zeros( self.constraint.weight.shape, dtype=config.DTYPE, device=config.DEVICE ), ) ) self.W = backend.concatenate( ( self.W, backend.as_array( self.constraint.weight, dtype=config.DTYPE, device=config.DEVICE ), ) ) self.forward = lambda x: backend.concatenate( (forward(x)[self.mask], self.constraint(model, params=x)) ) self.jacobian = lambda x: backend.concatenate( (jacobian(x)[self.mask], self.constraint.jacobian(model, params=x)) ) # variable to store covariance matrix if it is ever computed self._covariance_matrix = None # Degrees of freedom if ndf is None: self.ndf = max(1.0, len(self.Y) - len(self.current_state)) else: self.ndf = ndf
[docs] def chi2_ndf(self): return backend.sum(self.W * (self.Y - self.forward(self.current_state)) ** 2) / self.ndf
[docs] def poisson_2nll_ndf(self): M = self.forward(self.current_state) return 2 * backend.sum(M - self.Y * backend.log(M + 1e-10)) / self.ndf
[docs] def fit(self, update_uncertainty=True) -> BaseOptimizer: """This performs the fitting operation. It iterates the LM step function until convergence is reached. Includes a message after fitting to indicate how the fitting exited. Typically if the message returns a "success" then the algorithm found a minimum. This may be the desired solution, or a pathological local minimum, this often depends on the initial conditions. """ if len(self.current_state) == 0: if self.verbose > 0: config.logger.warning("No parameters to optimize. Exiting fit") self.message = "No parameters to optimize. Exiting fit" return self if self.likelihood == "gaussian": quantity = "Chi^2/DoF" self.loss_history = [self.chi2_ndf().item()] elif self.likelihood == "poisson": quantity = "2NLL/DoF" self.loss_history = [self.poisson_2nll_ndf().item()] self._covariance_matrix = None self.L_history = [self.L] self.lambda_history = [backend.to_numpy(backend.copy(self.current_state))] if self.verbose > 0: config.logger.info( f"==Starting LM fit for '{self.model.name}' with {len(self.current_state)} dynamic parameters and {len(self.Y)} pixels==" ) for _ in range(self.max_iter): if self.verbose > 0: config.logger.info(f"{quantity}: {self.loss_history[-1]:.6g}, L: {self.L:.3g}") try: if self.fit_valid: with ValidContext(self.model), ValidContext(self.constraint): res = func.lm_step( x=self.model.to_valid(self.current_state), data=self.Y, model=self.forward, weight=self.W, jacobian=self.jacobian, L=self.L, Lup=self.Lup, Ldn=self.Ldn, likelihood=self.likelihood, ) self.current_state = self.model.from_valid(backend.copy(res["x"])) else: res = func.lm_step( x=self.current_state, data=self.Y, model=self.forward, weight=self.W, jacobian=self.jacobian, L=self.L, Lup=self.Lup, Ldn=self.Ldn, likelihood=self.likelihood, ) self.current_state = backend.copy(res["x"]) except OptimizeStopFail: if self.verbose > 0: config.logger.warning("Could not find step to improve Chi^2, stopping") self.message = ( self.message + "success by immobility. Could not find step to improve Chi^2. Convergence not guaranteed" ) break except OptimizeStopSuccess as e: if self.verbose > 0: config.logger.info(f"Optimization converged successfully: {e}") self.message = self.message + "success" break self.L = np.clip(res["L"], 1e-9, 1e9) self.L_history.append(res["L"]) self.loss_history.append(2 * res["nll"] / self.ndf) self.lambda_history.append(backend.to_numpy(backend.copy(self.current_state))) if self.check_convergence(): break else: self.message = self.message + "fail. Maximum iterations" if self.verbose > 0: config.logger.info( f"Final {quantity}: {np.nanmin(self.loss_history):.6g}, L: {self.L_history[np.nanargmin(self.loss_history)]:.3g}. Converged: {self.message}" ) self.model.set_values( backend.as_array(self.res(), dtype=config.DTYPE, device=config.DEVICE) ) if update_uncertainty: self.update_uncertainty() return self
[docs] def check_convergence(self) -> bool: """Check if the optimization has converged based on the last iteration's chi^2 and the relative tolerance. """ if len(self.loss_history) < 3: return False good_history = [self.loss_history[0]] for l in self.loss_history[1:]: if good_history[-1] > l: good_history.append(l) if len(self.loss_history) - len(good_history) >= 10: self.message = self.message + "success by immobility. Convergence not guaranteed" return True if len(good_history) < 3: return False if (good_history[-2] - good_history[-1]) / good_history[ -1 ] < self.relative_tolerance and self.L < 0.1: self.message = self.message + "success" return True if len(good_history) < 10: return False if (good_history[-10] - good_history[-1]) / good_history[-1] < self.relative_tolerance: self.message = self.message + "success by immobility. Convergence not guaranteed" return True return False
@property def covariance_matrix(self) -> ArrayLike: """The covariance matrix for the model at the current parameters. This can be used to construct a full Gaussian PDF for the parameters using: $\\mathcal{N}(\\mu,\\Sigma)$ where $\\mu$ is the optimized parameters and $\\Sigma$ is the covariance matrix. """ if self._covariance_matrix is not None: return self._covariance_matrix J = self.jacobian(self.current_state) if self.likelihood == "gaussian": hess = func.hessian(J, self.W) elif self.likelihood == "poisson": hess = func.hessian_poisson(J, self.Y, self.forward(self.current_state)) try: self._covariance_matrix = backend.linalg.inv(hess) except: config.logger.warning( "WARNING: Hessian is singular, likely at least one parameter is non-physical. Will use pseudo-inverse of Hessian to continue but results should be inspected." ) self._covariance_matrix = backend.linalg.pinv(hess) return self._covariance_matrix
[docs] def update_uncertainty(self) -> None: """Call this function after optimization to set the uncertainties for the parameters. This will use the diagonal of the covariance matrix to update the uncertainties. See the covariance_matrix function for the full representation of the uncertainties. """ # set the uncertainty for each parameter cov = self.covariance_matrix if backend.all(backend.isfinite(cov)): try: self.model.set_values( backend.sqrt(backend.abs(backend.diag(cov))), attribute="uncertainty" ) except RuntimeError as e: config.logger.warning(f"Unable to update uncertainty due to: {e}") else: config.logger.warning( "Unable to update uncertainty due to non finite covariance matrix" )