__all__ = [
"IRLS",
"OMP",
"ISTA",
"FISTA",
"SPGL1",
"SplitBregman",
]
import logging
import time
from math import sqrt
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
import numpy as np
from scipy.sparse.linalg import lsqr
from pylops import LinearOperator
from pylops.basicoperators import Diagonal, Identity, VStack
from pylops.optimization.basesolver import Solver, _units
from pylops.optimization.basic import cgls
from pylops.optimization.callback import _callback_stop
from pylops.optimization.eigs import power_iteration
from pylops.optimization.leastsquares import regularized_inversion
from pylops.utils import deps
from pylops.utils.backend import (
get_array_module,
get_module_name,
get_real_dtype,
inplace_set,
)
from pylops.utils.typing import InputDimsLike, NDArray, SamplingLike
spgl1_message = deps.spgl1_import("the spgl1 solver")
if spgl1_message is None:
from spgl1 import spgl1 as ext_spgl1
logger = logging.getLogger(__name__)
def _hardthreshold(x: NDArray, thresh: float) -> NDArray:
r"""Hard thresholding.
Applies hard thresholding to vector ``x`` (equal to the proximity
operator for :math:`\|\mathbf{x}\|_0`) as shown in [1]_.
.. [1] Chen, F., Shen, L., Suter, B.W., “Computing the proximity
operator of the ℓp norm with 0 < p < 1”,
IET Signal Processing, vol. 10. 2016.
Parameters
----------
x : :obj:`numpy.ndarray`
Vector
thresh : :obj:`float`
Threshold
Returns
-------
x1 : :obj:`numpy.ndarray`
Tresholded vector
"""
x1 = x.copy()
x1[np.abs(x) <= sqrt(2 * thresh)] = 0.0
return x1
def _softthreshold(x: NDArray, thresh: float) -> NDArray:
r"""Soft thresholding.
Applies soft thresholding to vector ``x`` (equal to the proximity
operator for :math:`\|\mathbf{x}\|_1`) as shown in [1]_.
.. [1] Chen, F., Shen, L., Suter, B.W., “Computing the proximity
operator of the ℓp norm with 0 < p < 1”,
IET Signal Processing, vol. 10. 2016.
Parameters
----------
x : :obj:`numpy.ndarray`
Vector
thresh : :obj:`float`
Threshold
Returns
-------
x1 : :obj:`numpy.ndarray`
Tresholded vector
"""
if np.iscomplexobj(x):
# https://stats.stackexchange.com/questions/357339/soft-thresholding-
# for-the-lasso-with-complex-valued-data
x1 = np.maximum(np.abs(x) - thresh, 0.0) * np.exp(1j * np.angle(x))
else:
x1 = np.maximum(np.abs(x) - thresh, 0.0) * np.sign(x)
return x1
def _halfthreshold(x: NDArray, thresh: float) -> NDArray:
r"""Half thresholding.
Applies half thresholding to vector ``x`` (equal to the proximity
operator for :math:`\|\mathbf{x}\|_{1/2}^{1/2}`) as shown in [1]_.
.. [1] Chen, F., Shen, L., Suter, B.W., “Computing the proximity
operator of the ℓp norm with 0 < p < 1”,
IET Signal Processing, vol. 10. 2016.
Parameters
----------
x : :obj:`numpy.ndarray`
Vector
thresh : :obj:`float`
Threshold
Returns
-------
x1 : :obj:`numpy.ndarray`
Tresholded vector
.. warning::
Since version 1.17.0 does not produce ``np.nan`` on bad input.
"""
ncp = get_array_module(x)
arg = ncp.ones_like(x)
arg[x != 0] = (thresh / 8.0) * (ncp.abs(x[x != 0.0]) / 3.0) ** (-1.5)
if ncp.iscomplexobj(arg):
arg.real = ncp.clip(arg.real, -1.0, 1.0)
arg.imag = ncp.clip(arg.imag, -1.0, 1.0)
else:
arg = ncp.clip(arg, -1.0, 1.0)
phi = 2.0 / 3.0 * ncp.arccos(arg)
x1 = 2.0 / 3.0 * x * (1.0 + ncp.cos(2.0 * np.pi / 3.0 - phi))
x1[ncp.abs(x) <= (54.0 ** (1.0 / 3.0) / 4.0) * thresh ** (2.0 / 3.0)] = 0
return x1
def _hardthreshold_percentile(x: NDArray, perc: float) -> NDArray:
r"""Percentile Hard thresholding.
Applies hard thresholding to vector ``x`` using a percentile to define
the amount of values in the input vector to be preserved as shown in [1]_.
.. [1] Chen, Y., Chen, K., Shi, P., Wang, Y., “Irregular seismic
data reconstruction using a percentile-half-thresholding algorithm”,
Journal of Geophysics and Engineering, vol. 11. 2014.
Parameters
----------
x : :obj:`numpy.ndarray`
Vector
thresh : :obj:`float`
Threshold
Returns
-------
x1 : :obj:`numpy.ndarray`
Tresholded vector
"""
thresh = np.percentile(np.abs(x), perc)
return _hardthreshold(x, 0.5 * thresh**2)
def _softthreshold_percentile(x: NDArray, perc: float) -> NDArray:
r"""Percentile Soft thresholding.
Applies soft thresholding to vector ``x`` using a percentile to define
the amount of values in the input vector to be preserved as shown in [1]_.
.. [1] Chen, Y., Chen, K., Shi, P., Wang, Y., “Irregular seismic
data reconstruction using a percentile-half-thresholding algorithm”,
Journal of Geophysics and Engineering, vol. 11. 2014.
Parameters
----------
x : :obj:`numpy.ndarray`
Vector
perc : :obj:`float`
Percentile
Returns
-------
x : :obj:`numpy.ndarray`
Tresholded vector
"""
thresh = np.percentile(np.abs(x), perc)
return _softthreshold(x, thresh)
def _halfthreshold_percentile(x: NDArray, perc: float) -> NDArray:
r"""Percentile Half thresholding.
Applies half thresholding to vector ``x`` using a percentile to define
the amount of values in the input vector to be preserved as shown in [1]_.
.. [1] Xu, Z., Xiangyu, C., Xu, F. and Zhang, H., “L1/2 Regularization: A
Thresholding Representation Theory and a Fast Solver”, IEEE Transactions
on Neural Networks and Learning Systems, vol. 23. 2012.
Parameters
----------
x : :obj:`numpy.ndarray`
Vector
perc : :obj:`float`
Percentile
Returns
-------
x : :obj:`numpy.ndarray`
Thresholded vector
"""
thresh = np.percentile(np.abs(x), perc)
# return _halfthreshold(x, (2. / 3. * thresh) ** (1.5))
return _halfthreshold(x, (4.0 / 54 ** (1.0 / 3.0) * thresh) ** 1.5)
[docs]class IRLS(Solver):
r"""Iteratively reweighted least squares.
Solve an optimization problem with :math:`L_1` cost function (data IRLS)
or :math:`L_1` regularization term (model IRLS) given the operator ``Op``
and data ``y``.
In the *data IRLS*, the cost function is minimized by iteratively solving a
weighted least squares problem with the weight at iteration :math:`i`
being based on the data residual at iteration :math:`i-1`. This IRLS solver
is robust to *outliers* since the L1 norm given less weight to large
residuals than L2 norm does.
Similarly in the *model IRLS*, the weight at at iteration :math:`i`
is based on the model at iteration :math:`i-1`. This IRLS solver inverts
for a sparse model vector.
Parameters
----------
Op : :obj:`pylops.LinearOperator`
Operator to invert
Attributes
----------
ncp : :obj:`module`
Array module used by the solver (obtained via
:func:`pylops.utils.backend.get_array_module`)
). Available only after ``setup`` is called.
isjax : :obj:`bool`
Whether the input data is a JAX array or not.
r : :obj:`numpy.ndarray`
Residual vector of size :math:`[N \times 1]` used in the
solver when ``preallocate=True``. Available only after ``setup``
is called.
rw : :obj:`numpy.ndarray`
Weigthing vector of size :math:`[N \times 1]` for ``kind=data``
or ``kind=datamodel`` or of size :math:`[M \times 1]` for ``kind=model``
used in the solver when ``preallocate=True``. Available only
after ``setup`` is called and updated at each call to ``step``.
cost : :obj:`list`
History of the L2 norm of the residual. Available only after
``setup`` is called and updated at each call to ``step``.
iiter : :obj:`int`
Current iteration number. Available only after
``setup`` is called and updated at each call to ``step``.
Raises
------
NotImplementedError
If ``kind`` is different from model or data
Notes
-----
*Data IRLS* solves the following optimization problem for the operator
:math:`\mathbf{Op}` and the data :math:`\mathbf{y}`:
.. math::
J = \|\mathbf{y} - \mathbf{Op}\,\mathbf{x}\|_1
by a set of outer iterations which require to repeatedly solve a
weighted least squares problem of the form:
.. math::
\DeclareMathOperator*{\argmin}{arg\,min}
\mathbf{x}^{(i+1)} = \argmin_\mathbf{x} \|\mathbf{y} -
\mathbf{Op}\,\mathbf{x}\|_{2, \mathbf{R}^{(i)}}^2 +
\epsilon_\mathbf{I}^2 \|\mathbf{x}\|_2^2
where :math:`\mathbf{R}^{(i)}` is a diagonal weight matrix
whose diagonal elements at iteration :math:`i` are equal to the absolute
inverses of the residual vector :math:`\mathbf{r}^{(i)} =
\mathbf{y} - \mathbf{Op}\,\mathbf{x}^{(i)}` at iteration :math:`i`.
More specifically the :math:`j`-th element of the diagonal of
:math:`\mathbf{R}^{(i)}` is
.. math::
R^{(i)}_{j,j} = \frac{1}{\left| r^{(i)}_j \right| + \epsilon_\mathbf{R}}
or
.. math::
R^{(i)}_{j,j} = \frac{1}{\max\{\left|r^{(i)}_j\right|, \epsilon_\mathbf{R}\}}
depending on the choice ``threshR``. In either case,
:math:`\epsilon_\mathbf{R}` is the user-defined stabilization/thresholding
factor [1]_.
Similarly *model IRLS* solves the following optimization problem for the
operator :math:`\mathbf{Op}` and the data :math:`\mathbf{y}`:
.. math::
J = \|\mathbf{x}\|_1 \quad \text{subject to} \quad
\mathbf{y} = \mathbf{Op}\,\mathbf{x}
by a set of outer iterations which require to repeatedly solve a
weighted least squares problem of the form [2]_:
.. math::
\mathbf{x}^{(i+1)} = \operatorname*{arg\,min}_\mathbf{x}
\|\mathbf{x}\|_{2, \mathbf{R}^{(i)}}^2 \quad \text{subject to} \quad
\mathbf{y} = \mathbf{Op}\,\mathbf{x}
where :math:`\mathbf{R}^{(i)}` is a diagonal weight matrix
whose diagonal elements at iteration :math:`i` are equal to the absolutes
of the model vector :math:`\mathbf{x}^{(i)}` at iteration
:math:`i`. More specifically the :math:`j`-th element of the diagonal of
:math:`\mathbf{R}^{(i)}` is
.. math::
R^{(i)}_{j,j} = \left|x^{(i)}_j\right|.
.. [1] https://en.wikipedia.org/wiki/Iteratively_reweighted_least_squares
.. [2] Chartrand, R., and Yin, W. "Iteratively reweighted algorithms for
compressive sensing", IEEE. 2008.
"""
def _print_setup(self, xcomplex: bool = False) -> None:
self._print_solver(f" ({self.kind})")
strpar = f"threshR = {self.threshR}\tepsR = {self.epsR}\tepsI = {self.epsI}"
if self.nouter is not None:
strpar1 = f"tolIRL = {self.nouter}\tnouter = {self.nouter}"
else:
strpar1 = f"tolIRL = {self.nouter}"
print(strpar)
print(strpar1)
print("-" * 80)
if not xcomplex:
head1 = " Itn x[0] r2norm"
else:
head1 = " Itn x[0] r2norm"
print(head1)
def _print_step(self, x: NDArray) -> None:
strx = f"{x[0]:1.2e} " if np.iscomplexobj(x) else f"{x[0]:11.4e}"
str1 = f"{self.iiter:6g} " + strx
str2 = f" {self.rnorm:10.3e}"
print(str1 + str2)
def memory_usage(
self,
kind: str = "data",
show: bool = False,
unit: str = "B",
) -> float:
"""Compute memory usage of the solver
Parameters
----------
kind : :obj:`str`, optional
Kind of solver (``model``, ``data`` or ``datamodel``)
show : :obj:`bool`, optional
Display memory usage
unit: :obj:`str`, optional
Unit used to display memory usage (
``B``, ``KB``, ``MB`` or ``GB``)
Returns
-------
memuse :obj:`float`
Memory usage in Bytes
"""
# Get number of bytes of dtype used in the solver
nbytes = np.dtype(self.Op.dtype).itemsize
# Setup: y + augmented y if kind=datamodel
memuse = self.Op.shape[0] * nbytes
if kind == "datamodel":
memuse += self.Op.shape[1] * nbytes
# Step (additional variables to those in setup): rw
if kind == "data":
memuse += self.Op.shape[0] * nbytes
elif kind == "model":
memuse += self.Op.shape[1] * nbytes
elif kind == "datamodel":
memuse += 2 * self.Op.shape[0] * nbytes
if show:
print(f"IRLS predicted memory usage: {memuse / _units[unit]:.2f} {unit}")
return memuse
def setup(
self,
y: NDArray,
nouter: Optional[int] = None,
threshR: bool = False,
epsR: float = 1e-10,
epsI: float = 1e-10,
tolIRLS: float = 1e-10,
warm: bool = False,
kind: str = "data",
preallocate: bool = False,
show: bool = False,
) -> None:
r"""Setup solver
Parameters
----------
y : :obj:`numpy.ndarray`
Data of size :math:`[N \times 1]`
nouter : :obj:`int`, optional
Number of outer iterations
threshR : :obj:`bool`, optional
Apply thresholding in creation of weight (``True``)
or damping (``False``)
epsR : :obj:`float`, optional
Damping to be applied to residuals for weighting term
epsI : :obj:`float`, optional
Tikhonov damping
tolIRLS : :obj:`float`, optional
Tolerance. Stop outer iterations if difference between inverted model
at subsequent iterations is smaller than ``tolIRLS``
warm : :obj:`bool`, optional
Warm start each inversion inner step with previous estimate (``True``)
or not (``False``). This only applies to ``kind="data"`` and
``kind="datamodel"``
kind : :obj:`str`, optional
Kind of solver (``model``, ``data`` or ``datamodel``)
preallocate : :obj:`bool`, optional
.. versionadded:: 2.6.0
Pre-allocate all variables used by the solver. Note that if ``y``
is a JAX array, this option is ignored and variables are not
pre-allocated since JAX does not support in-place operations.
show : :obj:`bool`, optional
Display setup log
"""
self.y = y
self.nouter = nouter
self.threshR = threshR
self.epsR = epsR
self.epsI = epsI
self.tolIRLS = tolIRLS
self.warm = warm
self.kind = kind
self.ncp = get_array_module(y)
self.isjax = get_module_name(self.ncp) == "jax"
self._setpreallocate(preallocate)
# choose step to use
if self.kind == "data":
self._step = self._step_data
elif self.kind == "model":
self._step = self._step_model
self.Iop = Identity(y.size, dtype=y.dtype)
elif self.kind == "datamodel":
self._step = self._step_data
# augment Op and y
self.Op = VStack([self.Op, epsI * Identity(self.Op.shape[1])])
self.epsI = 0.0 # as epsI is added to the augmented system already
self.y = self.ncp.hstack([self.y, self.ncp.zeros(self.Op.shape[1])])
else:
raise NotImplementedError("kind must be model, data or datamodel")
if self.preallocate:
self.r = self.ncp.empty_like(self.y)
if "data" in self.kind:
self.rw = self.ncp.empty_like(self.y)
else:
self.rw = self.ncp.empty(self.Op.shape[1], dtype=self.Op.dtype)
# create variables to track the residual norm and iterations
self.cost = [
float(np.linalg.norm(self.y)),
]
self.iiter = 0
# print setup
if show:
self._print_setup()
def _step_data(self, x: NDArray, engine: str = "scipy", **kwargs_solver) -> NDArray:
r"""Run one step of solver with L1 data term"""
# add preallocate to keywords of solver
if self.preallocate and (engine == "pylops" or self.ncp != np):
kwargs_solver["preallocate"] = True
if self.iiter == 0:
# first iteration (standard least-squares)
x = regularized_inversion(
self.Op,
self.y,
None,
x0=x if self.warm else None,
damp=self.epsI,
engine=engine,
**kwargs_solver,
)[0]
else:
# other iterations (weighted least-squares)
if self.preallocate and self.iiter == 1:
self.rw = self.ncp.zeros_like(self.y)
if not self.preallocate:
if self.threshR:
self.rw = 1.0 / self.ncp.maximum(self.ncp.abs(self.r), self.epsR)
else:
self.rw = 1.0 / (self.ncp.abs(self.r) + self.epsR)
self.rw = self.rw / self.rw.max()
else:
if self.threshR:
self.ncp.divide(
1.0,
self.ncp.maximum(self.ncp.abs(self.r), self.epsR),
out=self.rw,
)
else:
self.ncp.divide(
1.0, (self.ncp.abs(self.r) + self.epsR), out=self.rw
)
self.ncp.divide(self.rw, self.rw.max(), out=self.rw)
R = Diagonal(self.ncp.sqrt(self.rw))
x = regularized_inversion(
self.Op,
self.y,
None,
Weight=R,
x0=x if self.warm else None,
damp=self.epsI,
engine=engine,
**kwargs_solver,
)[0]
return x
def _step_model(
self, x: NDArray, engine: str = "scipy", **kwargs_solver
) -> NDArray:
r"""Run one step of solver with L1 model term"""
# add preallocate to keywords of solver
if self.preallocate and (engine == "pylops" or self.ncp != np):
kwargs_solver["preallocate"] = True
if self.iiter == 0:
# first iteration (unweighted least-squares)
if engine == "scipy" and self.ncp == np:
x = self.Op.rmatvec(
lsqr(
self.Op @ self.Op.H + (self.epsI**2) * self.Iop,
self.y,
**kwargs_solver,
)[0]
)
elif engine == "pylops" or self.ncp != np:
x = self.Op.rmatvec(
cgls(
self.Op @ self.Op.H + (self.epsI**2) * self.Iop,
self.y,
self.ncp.zeros(int(self.Op.shape[0]), dtype=self.Op.dtype),
**kwargs_solver,
)[0]
)
else:
# other iterations (weighted least-squares)
if self.preallocate and self.iiter == 1:
self.rw = self.ncp.zeros_like(x)
if not self.preallocate:
self.rw = self.ncp.abs(x)
self.rw = self.rw / self.rw.max()
else:
self.ncp.abs(x, out=self.rw)
self.ncp.divide(self.rw, self.rw.max(), out=self.rw)
R = Diagonal(self.rw, dtype=self.rw.dtype)
if engine == "scipy" and self.ncp == np:
x = R.matvec(
self.Op.rmatvec(
lsqr(
self.Op @ R @ self.Op.H + self.epsI**2 * self.Iop,
self.y,
**kwargs_solver,
)[0]
)
)
elif engine == "pylops" or self.ncp != np:
x = R.matvec(
self.Op.rmatvec(
cgls(
self.Op @ R @ self.Op.H + self.epsI**2 * self.Iop,
self.y,
self.ncp.zeros(int(self.Op.shape[0]), dtype=self.Op.dtype),
**kwargs_solver,
)[0]
)
)
return x
def step(
self,
x: NDArray,
engine: str = "scipy",
show: bool = False,
**kwargs_solver,
) -> NDArray:
r"""Run one step of solver
Parameters
----------
x : :obj:`numpy.ndarray`
Current model vector to be updated by a step of ISTA
engine : :obj:`str`, optional
.. versionadded:: 2.6.0
Solver to use (``scipy`` or ``pylops``)
show : :obj:`bool`, optional
Display iteration log
**kwargs_solver
Arbitrary keyword arguments for
:py:func:`scipy.sparse.linalg.cg` solver for data IRLS and
:py:func:`scipy.sparse.linalg.lsqr` solver for model IRLS when using
numpy data and ``engine='scipy'`` (or
:py:func:`pylops.optimization.solver.cg` and
:py:func:`pylops.optimization.solver.cgls` when using cupy data or
``engine='pylops'``)
Returns
-------
x : :obj:`numpy.ndarray`
Updated model vector
"""
# update model
x = self._step(x, engine=engine, **kwargs_solver)
# compute residual
if not self.preallocate:
self.r: NDArray = self.y - self.Op.matvec(x)
else:
self.ncp.subtract(self.y, self.Op.matvec(x), out=self.r)
self.rnorm = self.ncp.linalg.norm(self.r)
self.iiter += 1
self.cost.append(float(self.rnorm))
if show:
self._print_step(x)
return x
def run(
self,
x: Optional[NDArray],
nouter: int = 10,
engine: str = "scipy",
show: bool = False,
itershow: Tuple[int, int, int] = (10, 10, 10),
**kwargs_solver,
) -> NDArray:
r"""Run solver
Parameters
----------
x : :obj:`numpy.ndarray`
Current model vector to be updated by multiple steps of IRLS. Provide
``None`` to initialize internally as zero vector
nouter : :obj:`int`, optional
Number of outer iterations.
engine : :obj:`str`, optional
.. versionadded:: 2.6.0
Solver to use (``scipy`` or ``pylops``)
show : :obj:`bool`, optional
Display logs
itershow : :obj:`tuple`, optional
Display set log for the first N1 steps, last N2 steps,
and every N3 steps in between where N1, N2, N3 are the
three element of the list.
**kwargs_solver
Arbitrary keyword arguments for
:py:func:`scipy.sparse.linalg.cg` solver for data IRLS and
:py:func:`scipy.sparse.linalg.lsqr` solver for model IRLS when using
numpy data and ``engine='scipy'`` (or
:py:func:`pylops.optimization.solver.cg` and
:py:func:`pylops.optimization.solver.cgls` when using cupy data or
``engine='pylops'``)
Returns
-------
x : :obj:`numpy.ndarray`
Estimated model of size :math:`[M \times 1]`
"""
nouter = nouter if self.nouter is None else self.nouter
if x is not None:
self.x0 = x.copy()
self.y = self.y - self.Op.matvec(x)
# choose xold to ensure tolerance test is passed initially
xold = x.copy() + np.inf
while self.iiter < nouter and self.ncp.linalg.norm(x - xold) >= self.tolIRLS:
showstep = (
True
if show
and (
self.iiter < itershow[0]
or nouter - self.iiter < itershow[1]
or self.iiter % itershow[2] == 0
)
else False
)
xold = x.copy()
x = self.step(x, engine, showstep, **kwargs_solver)
self.callback(x)
# check if any callback has raised a stop flag
stop = _callback_stop(self.callbacks)
if stop:
break
# adding initial guess
if hasattr(self, "x0"):
x = self.x0 + x
return x
def finalize(self, show: bool = False) -> None:
r"""Finalize solver
Parameters
----------
show : :obj:`bool`, optional
Display finalize log
"""
self.tend = time.time()
self.telapsed = self.tend - self.tstart
self.nouter = self.iiter
if show:
self._print_finalize()
def solve(
self,
y: NDArray,
x0: Optional[NDArray] = None,
nouter: int = 10,
threshR: bool = False,
epsR: float = 1e-10,
epsI: float = 1e-10,
tolIRLS: float = 1e-10,
kind: str = "data",
warm: bool = False,
engine: str = "scipy",
preallocate: bool = False,
show: bool = False,
itershow: Tuple[int, int, int] = (10, 10, 10),
**kwargs_solver,
) -> NDArray:
r"""Run entire solver
Parameters
----------
y : :obj:`numpy.ndarray`
Data of size :math:`[N \times 1]`
x0 : :obj:`numpy.ndarray`, optional
Initial guess of size :math:`[N \times 1]`. If ``None``, initialize
internally as zero vector
nouter : :obj:`int`, optional
Number of outer iterations
threshR : :obj:`bool`, optional
Apply thresholding in creation of weight (``True``)
or damping (``False``)
epsR : :obj:`float`, optional
Damping to be applied to residuals for weighting term
epsI : :obj:`float`, optional
Tikhonov damping
tolIRLS : :obj:`float`, optional
Tolerance. Stop outer iterations if difference between inverted model
at subsequent iterations is smaller than ``tolIRLS``
warm : :obj:`bool`, optional
Warm start each inversion inner step with previous estimate (``True``) or not (``False``).
This only applies to ``kind="data"`` and ``kind="datamodel"``
kind : :obj:`str`, optional
Kind of solver (``data`` or ``model``)
engine : :obj:`str`, optional
.. versionadded:: 2.6.0
Solver to use (``scipy`` or ``pylops``)
preallocate : :obj:`bool`, optional
.. versionadded:: 2.6.0
Pre-allocate all variables used by the solver. Note that if ``y``
is a JAX array, this option is ignored and variables are not
pre-allocated since JAX does not support in-place operations.
show : :obj:`bool`, optional
Display setup log
itershow : :obj:`tuple`, optional
Display set log for the first N1 steps, last N2 steps,
and every N3 steps in between where N1, N2, N3 are the
three element of the list.
**kwargs_solver
Arbitrary keyword arguments for
:py:func:`scipy.sparse.linalg.cg` solver for data IRLS and
:py:func:`scipy.sparse.linalg.lsqr` solver for model IRLS when using
numpy data(or :py:func:`pylops.optimization.solver.cg` and
:py:func:`pylops.optimization.solver.cgls` when using cupy data)
Returns
-------
x : :obj:`numpy.ndarray`
Estimated model of size :math:`[N \times 1]`
"""
self.setup(
y=y,
threshR=threshR,
epsR=epsR,
epsI=epsI,
tolIRLS=tolIRLS,
warm=warm,
kind=kind,
preallocate=preallocate,
show=show,
)
if x0 is None:
x0 = self.ncp.zeros(self.Op.shape[1], dtype=self.y.dtype)
x = self.run(
x0,
nouter=nouter,
engine=engine,
show=show,
itershow=itershow,
**kwargs_solver,
)
self.finalize(show)
return x, self.nouter
[docs]class OMP(Solver):
r"""Orthogonal Matching Pursuit (OMP).
Solve an optimization problem with :math:`L_0` regularization function given
the operator ``Op`` and data ``y``. The operator can be real or complex,
and should ideally be either square :math:`N=M` or underdetermined
:math:`N<M`.
Parameters
----------
Op : :obj:`pylops.LinearOperator`
Operator to invert
Attributes
----------
ncp : :obj:`module`
Array module used by the solver (obtained via
:func:`pylops.utils.backend.get_array_module`)
). Available only after ``setup`` is called.
isjax : :obj:`bool`
Whether the input data is a JAX array or not.
norms : :obj:`numpy.ndarray`
Vector of size :math:`[Nbasis \times 1]` containing the
norms of each column of the ``Opbasis`` operator. Available
only after ``setup`` is called.
res : :obj:`numpy.ndarray`
Residual vector of size :math:`[N \times 1]`. Available only
after ``setup`` is called and updated at each call to ``step``.
cost : :obj:`list`
History of the L2 norm of the residual. Available only after
``setup`` is called and updated at each call to ``step``.
iiter : :obj:`int`
Current iteration number. Available only after
``setup`` is called and updated at each call to ``step``.
See Also
--------
ISTA: Iterative Shrinkage-Thresholding Algorithm (ISTA).
FISTA: Fast Iterative Shrinkage-Thresholding Algorithm (FISTA).
SPGL1: Spectral Projected-Gradient for L1 norm (SPGL1).
SplitBregman: Split Bregman for mixed L2-L1 norms.
Notes
-----
Solves the following optimization problem for the operator
:math:`\mathbf{Op}` and the data :math:`\mathbf{y}`:
.. math::
\|\mathbf{x}\|_0 \quad \text{subject to} \quad
\|\mathbf{Op}\,\mathbf{x}-\mathbf{y}\|_2^2 \leq \sigma^2,
using Orthogonal Matching Pursuit (OMP). This is a very
simple iterative algorithm which applies the following step:
.. math::
\DeclareMathOperator*{\argmin}{arg\,min}
\DeclareMathOperator*{\argmax}{arg\,max}
\Lambda_k = \Lambda_{k-1} \cup \left\{\argmax_j
\left|\mathbf{Op}^{j H}\,\mathbf{r}_k\right| \right\} \\
\mathbf{x}_k = \argmin_{\mathbf{x}}
\left\|\mathbf{Op}_{\Lambda_k}\,\mathbf{x} - \mathbf{y}\right\|_2^2
where :math:`\mathbf{Op}^j` is the :math:`j`-th column of the operator,
:math:`\mathbf{r}_k` is the residual at iteration :math:`k`, and
:math:`\mathbf{Op}_{\Lambda_k}` is the operator restricted to the columns
in the set :math:`\Lambda_k`.
Note that by choosing ``niter_inner=0`` the basic Matching Pursuit (MP)
algorithm is implemented instead. In other words, instead of solving an
optimization at each iteration to find the best :math:`\mathbf{x}` for the
currently selected basis functions, either the vector :math:`\mathbf{x}`
is just updated at the new basis function by adding the value from
the inner product :math:`\mathbf{Op}_j^H\,\mathbf{r}_k` to the current value
(``optimal_coeff=False``) or the optimal coefficient that minimizes the norm
of the residual :math:`\mathbf{r} - c * \mathbf{Op}^j` is estimated
(``optimal_coeff=True``) and added to the current value.
In the case the MP solver is used, it is highly recommended to provide a
normalized basis function. If different basis have different norms, the
solver is likely to diverge. Similar observations apply to OMP, even
though mild unbalancing between the basis is generally properly handled.
Two possible ways to handle the scenario fo non-normalized basis functions
are:
- Find the normalization factor of the the basis functions before
running the solver (this is done by choosing ``normalizecols=True``);
- Find the optimal coefficient that minimizes the norm of the residual
:math:`\mathbf{r} - c * \mathbf{Op}^j` at every iteration (this is
done by choosing ``optimal_coeff=True``).
Finally, when the operator is a chain of operators, with the rigth-most
representing the basis function, if the operator of the basis function is
provided in the ``Opbasis`` parameter, the solver will use this operator
to find the normalization factor for each column of the operator.
"""
def _print_setup(self, xcomplex: bool = False) -> None:
self._print_solver("(Only MP)" if self.niter_inner == 0 else "", nbar=55)
strpar = (
f"sigma = {self.sigma:.2e}\tniter_outer = {self.niter_outer}\n"
f"niter_inner = {self.niter_inner}\tnormalization={self.normalizecols}"
)
print(strpar)
print("-" * 55)
if not xcomplex:
head1 = " Itn x[0] r2norm"
else:
head1 = " Itn x[0] r2norm"
print(head1)
def _print_step(self, x: NDArray) -> None:
strx = f"{x[0]:1.2e} " if np.iscomplexobj(x) else f"{x[0]:11.4e}"
str1 = f"{self.iiter:6g} " + strx
str2 = f" {self.cost[-1]:10.3e}"
print(str1 + str2)
def memory_usage(
self,
show: bool = False,
unit: str = "B",
) -> float:
"""Compute memory usage of the solver
Parameters
----------
show : :obj:`bool`, optional
Display memory usage
unit: :obj:`str`, optional
Unit used to display memory usage (
``B``, ``KB``, ``MB`` or ``GB``)
Returns
-------
memuse :obj:`float`
Memory usage in Bytes
"""
# Get number of bytes of dtype used in the solver
nbytes = np.dtype(self.Op.dtype).itemsize
# Setup: y, res
memuse = (2 * self.Op.shape[0]) * nbytes
# Step (additional variables to those in setup): cres, cres_abs
memuse += (2 * self.Op.shape[0]) * nbytes
if show:
print(f"OMP predicted memory usage: {memuse / _units[unit]:.2f} {unit}")
return memuse
def setup(
self,
y: NDArray,
niter_outer: int = 10,
niter_inner: int = 40,
sigma: float = 1e-4,
normalizecols: bool = False,
Opbasis: Optional["LinearOperator"] = None,
optimal_coeff: bool = False,
preallocate: bool = False,
show: bool = False,
) -> None:
r"""Setup solver
Parameters
----------
y : :obj:`numpy.ndarray`
Data of size :math:`[N \times 1]`
niter_outer : :obj:`int`, optional
Number of iterations of outer loop
niter_inner : :obj:`int`, optional
Number of iterations of inner loop. By choosing ``niter_inner=0``, the
Matching Pursuit (MP) algorithm is implemented.
sigma : :obj:`list`
Maximum :math:`L^2` norm of residual. When smaller stop iterations.
normalizecols : :obj:`list`, optional
Normalize columns (``True``) or not (``False``). Note that this can be
expensive as it requires applying the forward operator
:math:`n_{cols}` times to unit vectors (i.e., containing 1 at
position j and zero otherwise); use only when the columns of the
operator are expected to have highly varying norms.
Opbasis : :obj:`pylops.LinearOperator`
Operator representing the basis functions. If ``None``, the entire
operator used for inversion `Op` is used.
optimal_coeff : :obj:`bool`, optional
Estimate optimal coefficient that minimizes the norm of the residual
:math:`\mathbf{r} - c * \mathbf{Op}^j) norm (``True``) or use the
directly the value from the inner product
:math:`\mathbf{Op}_j^H\,\mathbf{r}_k`.
preallocate : :obj:`bool`, optional
.. versionadded:: 2.6.0
Pre-allocate all variables used by the solver. Note that if ``y``
is a JAX array, this option is ignored and variables are not
pre-allocated since JAX does not support in-place operations.
show : :obj:`bool`, optional
Display setup log
"""
self.y = y
self.niter_outer = niter_outer
self.niter_inner = niter_inner
self.sigma = sigma
self.normalizecols = normalizecols
self.Opbasis = Opbasis if Opbasis is not None else self.Op
self.optimal_coeff = optimal_coeff
self.ncp = get_array_module(y)
self.isjax = get_module_name(self.ncp) == "jax"
self._setpreallocate(preallocate)
# find normalization factor for each column
if self.normalizecols:
ncols = self.Opbasis.shape[1]
self.norms = self.ncp.zeros(ncols)
for icol in range(ncols):
unit = self.ncp.zeros(ncols, dtype=self.Opbasis.dtype)
unit[icol] = 1
self.norms[icol] = self.ncp.linalg.norm(self.Opbasis.matvec(unit))
# create variables to track the residual norm and iterations
self.res = self.y.copy()
self.cost = [
float(np.linalg.norm(self.res)),
]
self.iiter = 0
if show:
self._print_setup()
def step(
self,
x: NDArray,
cols: InputDimsLike,
engine: str = "scipy",
show: bool = False,
**kwargs_solver,
) -> NDArray:
r"""Run one step of solver
Parameters
----------
x : :obj:`list` or :obj:`numpy.ndarray`
Current model vector to be updated by a step of OMP
cols : :obj:`list`
Current list of chosen elements of vector x to be updated by a step of OMP
engine : :obj:`str`, optional
.. versionadded:: 2.6.0
Solver to use (``scipy`` or ``pylops``)
show : :obj:`bool`, optional
Display iteration log
**kwargs_solver
Arbitrary keyword arguments for
:py:func:`scipy.sparse.linalg.lsqr` solver when using
numpy data and ``engine='scipy'`` (or
:py:func:`pylops.optimization.solver.cgls` when using cupy
data or ``engine='pylops'``)
Returns
-------
x : :obj:`numpy.ndarray`
Updated model vector
cols : :obj:`list`
Current list of chosen elements
"""
# add preallocate to keywords of solver
if self.preallocate and (engine == "pylops" or self.ncp != np):
kwargs_solver["preallocate"] = True
# compute inner products
cres = self.Op.rmatvec(self.res)
if self.normalizecols:
cres = cres / self.norms
cres_abs = self.ncp.abs(cres)
# choose column with max cres
cres_max = self.ncp.max(cres_abs)
imax = self.ncp.argwhere(cres_abs == cres_max).ravel()
nimax = len(imax)
if nimax > 0:
imax = imax[np.random.permutation(nimax)[0]]
else:
imax = imax[0]
# update active set
if imax not in cols:
addnew = True
cols.append(int(imax))
else:
addnew = False
imax_in_cols = cols.index(imax)
# estimate model for current set of columns
if self.niter_inner == 0:
# MP update
Opcol = self.Op.apply_columns(
[
int(imax),
]
)
if not self.optimal_coeff:
# update with coefficient that maximizes the inner product
if not self.preallocate:
self.res -= Opcol.matvec(cres[imax] * self.ncp.ones(1))
else:
self.ncp.subtract(
self.res,
Opcol.matvec(cres[imax] * self.ncp.ones(1)),
out=self.res,
)
if addnew:
x.append(cres[imax])
else:
x[imax_in_cols] += cres[imax]
else:
# find optimal coefficient that minimizes the residual (r - cres * col)
col = Opcol.matvec(self.ncp.ones(1, dtype=Opcol.dtype))
cresopt = (Opcol.rmatvec(self.res) / Opcol.rmatvec(col))[0]
if not self.preallocate:
self.res -= Opcol.matvec(cresopt * self.ncp.ones(1))
else:
self.ncp.subtract(
self.res, Opcol.matvec(cresopt * self.ncp.ones(1)), out=self.res
)
if addnew:
x.append(cresopt)
else:
x[imax_in_cols] += cresopt
else:
# OMP update
Opcol = self.Op.apply_columns(cols)
if engine == "scipy" and self.ncp == np:
x = lsqr(Opcol, self.y, iter_lim=self.niter_inner, **kwargs_solver)[0]
elif engine == "pylops" or self.ncp != np:
x = cgls(
Opcol,
self.y,
self.ncp.zeros(int(Opcol.shape[1]), dtype=Opcol.dtype),
niter=self.niter_inner,
**kwargs_solver,
)[0]
if not self.preallocate:
self.res = self.y - Opcol.matvec(x)
else:
self.res = Opcol.matvec(x)
self.ncp.subtract(self.res, self.y, out=self.res)
self.iiter += 1
self.cost.append(float(self.ncp.linalg.norm(self.res)))
if show:
self._print_step(x)
return x, cols
def run(
self,
x: NDArray,
cols: InputDimsLike,
engine: str = "scipy",
show: bool = False,
itershow: Tuple[int, int, int] = (10, 10, 10),
) -> Tuple[NDArray, InputDimsLike]:
r"""Run solver
Parameters
----------
x : :obj:`numpy.ndarray`
Current model vector to be updated by multiple steps of IRLS
cols : :obj:`list`
Current list of chosen elements of vector x to be updated by a step of OMP
engine : :obj:`str`, optional
.. versionadded:: 2.6.0
Solver to use (``scipy`` or ``pylops``)
show : :obj:`bool`, optional
Display logs
itershow : :obj:`tuple`, optional
Display set log for the first N1 steps, last N2 steps,
and every N3 steps in between where N1, N2, N3 are the
three element of the list.
Returns
-------
x : :obj:`numpy.ndarray`
Estimated model of size :math:`[M \times 1]`
cols : :obj:`list`
Current list of chosen elements
"""
while self.iiter < self.niter_outer and self.cost[self.iiter] > self.sigma:
showstep = (
True
if show
and (
self.iiter < itershow[0]
or self.niter_outer - self.iiter < itershow[1]
or self.iiter % itershow[2] == 0
)
else False
)
x, cols = self.step(x, cols, engine, showstep)
self.callback(x, cols)
# check if any callback has raised a stop flag
stop = _callback_stop(self.callbacks)
if stop:
break
return x, cols
def finalize(
self,
x: NDArray,
cols: InputDimsLike,
show: bool = False,
) -> NDArray:
r"""Finalize solver
Parameters
----------
x : :obj:`list` or :obj:`numpy.ndarray`
Current model vector to be updated by a step of OMP
cols : :obj:`list`
Current list of chosen elements of vector x to be updated by a step of OMP
show : :obj:`bool`, optional
Display finalize log
Returns
-------
xfin : :obj:`numpy.ndarray`
Estimated model of size :math:`[M \times 1]`
"""
self.tend = time.time()
self.telapsed = self.tend - self.tstart
self.cost = np.array(self.cost)
self.nouter = self.iiter
xfin = self.ncp.zeros(int(self.Op.shape[1]), dtype=self.Op.dtype)
xfin = inplace_set(self.ncp.array(x), xfin, self.ncp.array(cols))
if show:
self._print_finalize(nbar=55)
return xfin
def solve(
self,
y: NDArray,
niter_outer: int = 10,
niter_inner: int = 40,
sigma: float = 1e-4,
normalizecols: bool = False,
Opbasis: Optional["LinearOperator"] = None,
optimal_coeff: bool = False,
engine: str = "scipy",
preallocate: bool = False,
show: bool = False,
itershow: Tuple[int, int, int] = (10, 10, 10),
) -> Tuple[NDArray, int, NDArray]:
r"""Run entire solver
Parameters
----------
y : :obj:`numpy.ndarray`
Data of size :math:`[N \times 1]`
niter_outer : :obj:`int`, optional
Number of iterations of outer loop
niter_inner : :obj:`int`, optional
Number of iterations of inner loop. By choosing ``niter_inner=0``, the
Matching Pursuit (MP) algorithm is implemented.
sigma : :obj:`list`
Maximum :math:`L^2` norm of residual. When smaller stop iterations.
normalizecols : :obj:`list`, optional
Normalize columns (``True``) or not (``False``). Note that this can be
expensive as it requires applying the forward operator
:math:`n_{cols}` times to unit vectors (i.e., containing 1 at
position j and zero otherwise); use only when the columns of the
operator are expected to have highly varying norms.
Opbasis : :obj:`pylops.LinearOperator`
Operator representing the basis functions. If ``None``, the entire
operator used for inversion `Op` is used.
optimal_coeff : :obj:`bool`, optional
Estimate optimal coefficient that minimizes the norm of the residual
:math:`\mathbf{r} - c * \mathbf{Op}^j) norm (``True``) or use the
directly the value from the inner product
:math:`\mathbf{Op}_j^H\,\mathbf{r}_k`.
engine : :obj:`str`, optional
.. versionadded:: 2.6.0
Solver to use (``scipy`` or ``pylops``)
preallocate : :obj:`bool`, optional
.. versionadded:: 2.6.0
Pre-allocate all variables used by the solver. Note that if ``y``
is a JAX array, this option is ignored and variables are not
pre-allocated since JAX does not support in-place operations.
show : :obj:`bool`, optional
Display logs
itershow : :obj:`tuple`, optional
Display set log for the first N1 steps, last N2 steps,
and every N3 steps in between where N1, N2, N3 are the
three element of the list.
Returns
-------
x : :obj:`numpy.ndarray`
Estimated model of size :math:`[M \times 1]`
niter_outer : :obj:`int`
Number of effective outer iterations
cost : :obj:`numpy.ndarray`, optional
History of cost function
"""
self.setup(
y,
niter_outer=niter_outer,
niter_inner=niter_inner,
sigma=sigma,
normalizecols=normalizecols,
Opbasis=Opbasis,
optimal_coeff=optimal_coeff,
preallocate=preallocate,
show=show,
)
x: List[NDArray] = []
cols: List[InputDimsLike] = []
x, cols = self.run(x, cols, engine=engine, show=show, itershow=itershow)
x = self.finalize(x, cols, show)
return x, self.nouter, self.cost
[docs]class ISTA(Solver):
r"""Iterative Shrinkage-Thresholding Algorithm (ISTA).
Solve an optimization problem with :math:`L_p, \; p=0, 0.5, 1`
regularization, given the operator ``Op`` and data ``y``. The operator
can be real or complex, and should ideally be either square :math:`N=M`
or underdetermined :math:`N<M`.
Parameters
----------
Op : :obj:`pylops.LinearOperator`
Operator to invert
Attributes
----------
ncp : :obj:`module`
Array module used by the solver (obtained via
:func:`pylops.utils.backend.get_array_module`)
). Available only after ``setup`` is called.
isjax : :obj:`bool`
Whether the input data is a JAX array or not.
Opmatvec : :obj:`callable`
Function handle to ``Op.matvec`` or ``Op.matmat``
depending on the number of dimensions of ``y``.
Oprmatvec : :obj:`callable`
Function handle to ``Op.rmatvec`` or ``Op.rmatmat``
depending on the number of dimensions of ``y``.
SOpmatvec : :obj:`callable`
Function handle to ``SOp.matvec`` or ``SOp.matmat``
depending on the number of dimensions of ``y``.
SOprmatvec : :obj:`callable`
Function handle to ``SOp.rmatvec`` or ``SOp.rmatmat``
depending on the number of dimensions of ``y``.
threshf : :obj:`callable`
Function handle to the chosen thresholding method.
thresh : :obj:`float`
Threshold.
res : :obj:`numpy.ndarray`
Residual vector of size :math:`[N \times 1]` used in the
solver when ``preallocate=True``. Available only after ``setup``
is called and updated at each call to ``step``.
grad : :obj:`numpy.ndarray`
Gradient vector of size :math:`[M \times 1]` used in the
solver when ``preallocate=True``. Available only after ``setup``
is called and updated at each call to ``step``.
x_unthesh : :obj:`numpy.ndarray`
Unthresholded model vector of size :math:`[M \times 1]` used in the
solver when ``preallocate=True``. Available only after ``setup``
is called and updated at each call to ``step``.
xold : :obj:`numpy.ndarray`
Old model vector of size :math:`[M \times 1]` used in the
solver when ``preallocate=True``. Available only after ``setup``
is called and updated at each call to ``step``.
SOpx_unthesh : :obj:`numpy.ndarray`
Old model vector pre-multiplied by the regularization operator
of size :math:`[M_S \times 1]` used in the solver when ``preallocate=True``.
Available only after ``setup`` is called and updated at each call to ``step``.
normresold : :obj:`float`
Old norm of the residual.
t : :obj:`float`
FISTA auxiliary coefficient (not used in ISTA).
cost : :obj:`list`
History of the L2 norm of the total objectiv function. Available
only after ``setup`` is called and updated at each call to ``step``.
iiter : :obj:`int`
Current iteration number. Available only after
``setup`` is called and updated at each call to ``step``.
Raises
------
NotImplementedError
If ``threshkind`` is different from hard, soft, half, soft-percentile,
or half-percentile
ValueError
If ``perc=None`` when ``threshkind`` is soft-percentile or
half-percentile
ValueError
If ``monitorres=True`` and residual increases
See Also
--------
OMP: Orthogonal Matching Pursuit (OMP).
FISTA: Fast Iterative Shrinkage-Thresholding Algorithm (FISTA).
SPGL1: Spectral Projected-Gradient for L1 norm (SPGL1).
SplitBregman: Split Bregman for mixed L2-L1 norms.
Notes
-----
Solves the following synthesis problem for the operator
:math:`\mathbf{Op}` and the data :math:`\mathbf{y}`:
.. math::
J = \|\mathbf{y} - \mathbf{Op}\,\mathbf{x}\|_2^2 +
\epsilon \|\mathbf{x}\|_p
or the analysis problem:
.. math::
J = \|\mathbf{y} - \mathbf{Op}\,\mathbf{x}\|_2^2 +
\epsilon \|\mathbf{SOp}^H\,\mathbf{x}\|_p
if ``SOp`` is provided. Note that in the first case, ``SOp`` should be
assimilated in the modelling operator (i.e., ``Op=GOp * SOp``).
The Iterative Shrinkage-Thresholding Algorithms (ISTA) [1]_ is used, where
:math:`p=0, 0.5, 1`. This is a very simple iterative algorithm which
applies the following step:
.. math::
\mathbf{x}^{(i+1)} = T_{(\epsilon \alpha /2, p)} \left(\mathbf{x}^{(i)} +
\alpha\,\mathbf{Op}^H \left(\mathbf{y} - \mathbf{Op}\,\mathbf{x}^{(i)}\right)\right)
or
.. math::
\mathbf{x}^{(i+1)} = \mathbf{SOp}\,\left\{T_{(\epsilon \alpha /2, p)}
\mathbf{SOp}^H\,\left(\mathbf{x}^{(i)} + \alpha\,\mathbf{Op}^H \left(\mathbf{y} -
\mathbf{Op} \,\mathbf{x}^{(i)}\right)\right)\right\}
where :math:`\epsilon \alpha /2` is the threshold and :math:`T_{(\tau, p)}`
is the thresholding rule. The most common variant of ISTA uses the
so-called soft-thresholding rule :math:`T(\tau, p=1)`. Alternatively an
hard-thresholding rule is used in the case of :math:`p=0` or a half-thresholding
rule is used in the case of :math:`p=1/2`. Finally, percentile bases thresholds
are also implemented: the damping factor is not used anymore an the
threshold changes at every iteration based on the computed percentile.
.. [1] Daubechies, I., Defrise, M., and De Mol, C., “An iterative
thresholding algorithm for linear inverse problems with a sparsity
constraint”, Communications on pure and applied mathematics, vol. 57,
pp. 1413-1457. 2004.
"""
def _print_setup(self) -> None:
self._print_solver(f" ({self.threshkind} thresholding)")
if self.niter is not None:
strpar = f"eps = {self.eps:10e}\ttol = {self.tol:10e}\tniter = {self.niter}"
else:
strpar = f"eps = {self.eps:10e}\ttol = {self.tol:10e}"
if self.perc is None:
strpar1 = f"alpha = {self.alpha:10e}\tthresh = {self.thresh:10e}"
else:
strpar1 = f"alpha = {self.alpha:10e}\tperc = {self.perc:.1f}"
head1 = " Itn x[0] r2norm r12norm xupdate"
print(strpar)
print(strpar1)
print("-" * 80)
print(head1)
def _print_step(
self,
x: NDArray,
costdata: float,
costreg: float,
xupdate: float,
) -> None:
strx = (
f" {x[0]:1.2e} " if np.iscomplexobj(x) else f" {x[0]:11.4e} "
)
msg = (
f"{self.iiter:6g} "
+ strx
+ f"{costdata:10.3e} {costdata + costreg:9.3e} {xupdate:10.3e}"
)
print(msg)
def memory_usage(
self,
show: bool = False,
unit: str = "B",
) -> float:
"""Compute memory usage of the solver
Parameters
----------
show : :obj:`bool`, optional
Display memory usage
unit: :obj:`str`, optional
Unit used to display memory usage (
``B``, ``KB``, ``MB`` or ``GB``)
Returns
-------
memuse :obj:`float`
Memory usage in Bytes
"""
# Get number of bytes of dtype used in the solver
nbytes = np.dtype(self.Op.dtype).itemsize
# Setup: x0 - y
memuse = (self.Op.shape[1] + self.Op.shape[0]) * nbytes
# Step (additional variables to those in setup): xold, grad, x_unthesh - res
memuse += (3 * self.Op.shape[1] + self.Op.shape[0]) * nbytes
if show:
print(f"ISTA predicted memory usage: {memuse / _units[unit]:.2f} {unit}")
return memuse
def setup(
self,
y: NDArray,
x0: Optional[NDArray] = None,
niter: Optional[int] = None,
SOp: Optional[LinearOperator] = None,
eps: float = 0.1,
alpha: Optional[float] = None,
eigsdict: Optional[Dict[str, Any]] = None,
tol: float = 1e-10,
threshkind: str = "soft",
perc: Optional[float] = None,
decay: Optional[NDArray] = None,
monitorres: bool = False,
preallocate: bool = False,
show: bool = False,
) -> NDArray:
r"""Setup solver
Parameters
----------
y : :obj:`numpy.ndarray`
Data of size :math:`[N \times 1]` or :math:`[N \times R]` where
a solution for multiple right-hand-side is found when ``R>1``.
x0: :obj:`numpy.ndarray`, optional
Initial guess
niter : :obj:`int`
Number of iterations
SOp : :obj:`pylops.LinearOperator`, optional
Regularization operator (use when solving the analysis problem)
eps : :obj:`float`, optional
Sparsity damping
alpha : :obj:`float`, optional
Step size. To guarantee convergence, ensure
:math:`\alpha \le 1/\lambda_\text{max}`, where :math:`\lambda_\text{max}`
is the largest eigenvalue of :math:`\mathbf{Op}^H\mathbf{Op}`.
If ``None``, the maximum eigenvalue is estimated and the optimal step size
is chosen as :math:`1/\lambda_\text{max}`. If provided, the
convergence criterion will not be checked internally.
eigsdict : :obj:`dict`, optional
Dictionary of parameters to be passed to :func:`pylops.LinearOperator.eigs` method
when computing the maximum eigenvalue
tol : :obj:`float`, optional
Tolerance. Stop iterations if difference between inverted model
at subsequent iterations is smaller than ``tol``
threshkind : :obj:`str`, optional
Kind of thresholding ('hard', 'soft', 'half', 'hard-percentile',
'soft-percentile', or 'half-percentile' - 'soft' used as default)
perc : :obj:`float`, optional
Percentile, as percentage of values to be kept by thresholding (to be
provided when thresholding is soft-percentile or half-percentile)
decay : :obj:`numpy.ndarray`, optional
Decay factor to be applied to thresholding during iterations
monitorres : :obj:`bool`, optional
Monitor that residual is decreasing
preallocate : :obj:`bool`, optional
.. versionadded:: 2.6.0
Pre-allocate all variables used by the solver. Note that if ``y``
is a JAX array, this option is ignored and variables are not
pre-allocated since JAX does not support in-place operations.
show : :obj:`bool`, optional
Display setup log
Returns
-------
x : :obj:`numpy.ndarray`
Initial model vector
"""
self.y = y
self.SOp = SOp
self.niter = niter
self.eps = eps
self.eigsdict = {} if eigsdict is None else eigsdict
self.tol = tol
self.threshkind = threshkind
self.perc = perc
self.decay = decay
self.monitorres = monitorres
self.ncp = get_array_module(y)
self.isjax = get_module_name(self.ncp) == "jax"
self._setpreallocate(preallocate)
# choose matvec/rmatvec or matmat/rmatmat based on R
if y.ndim > 1 and y.shape[1] > 1:
self.Opmatvec = self.Op.matmat
self.Oprmatvec = self.Op.rmatmat
if self.SOp is not None:
self.SOpmatvec = self.SOp.matmat
self.SOprmatvec = self.SOp.rmatmat
else:
self.Opmatvec = self.Op.matvec
self.Oprmatvec = self.Op.rmatvec
if self.SOp is not None:
self.SOpmatvec = self.SOp.matvec
self.SOprmatvec = self.SOp.rmatvec
# choose thresholding function
if threshkind not in [
"hard",
"soft",
"half",
"hard-percentile",
"soft-percentile",
"half-percentile",
]:
raise NotImplementedError(
"threshkind should be hard, soft, half,"
"hard-percentile, soft-percentile, "
"or half-percentile"
)
if (
threshkind in ["hard-percentile", "soft-percentile", "half-percentile"]
and perc is None
):
raise ValueError(
"Provide a percentile when choosing hard-percentile,"
"soft-percentile, or half-percentile thresholding"
)
self.threshf: Callable[[NDArray, float], NDArray]
if threshkind == "soft":
self.threshf = _softthreshold
elif threshkind == "hard":
self.threshf = _hardthreshold
elif threshkind == "half":
self.threshf = _halfthreshold
elif threshkind == "hard-percentile":
self.threshf = _hardthreshold_percentile
elif threshkind == "soft-percentile":
self.threshf = _softthreshold_percentile
else:
self.threshf = _halfthreshold_percentile
# prepare decay (if not passed)
if perc is None and decay is None:
self.decay = self.ncp.ones(niter, dtype=get_real_dtype(self.Op.dtype))
# step size
if alpha is not None:
self.alpha = alpha
elif not hasattr(self, "alpha"):
# compute largest eigenvalues of Op^H * Op
Op1 = self.Op.H @ self.Op
if get_module_name(self.ncp) == "numpy":
maxeig: float = np.abs(
Op1.eigs(
neigs=1,
symmetric=True,
**self.eigsdict,
)[0]
)
else:
maxeig = np.abs(
power_iteration(
Op1,
dtype=Op1.dtype,
backend="cupy",
**self.eigsdict,
)[0]
)
self.alpha = 1.0 / maxeig
# define threshold
self.thresh = eps * self.alpha * 0.5
# initialize model and cost function
if x0 is None:
if y.ndim == 1:
x = self.ncp.zeros(int(self.Op.shape[1]), dtype=self.Op.dtype)
else:
x = self.ncp.zeros(
(int(self.Op.shape[1]), y.shape[1]), dtype=self.Op.dtype
)
else:
if y.ndim != x0.ndim:
# error for wrong dimensions
raise ValueError("Number of columns of x0 and data are not the same")
elif x0.shape[0] != self.Op.shape[1]:
# error for wrong dimensions
raise ValueError("Operator and input vector have different dimensions")
else:
x = x0.copy()
# initialize other internal variabled
if self.preallocate:
self.res = self.ncp.empty_like(y)
self.grad = self.ncp.empty_like(x)
self.x_unthesh = self.ncp.empty_like(x)
self.xold = self.ncp.empty_like(x)
if self.SOp is not None:
self.SOpx_unthesh: NDArray = self.ncp.zeros(
self.SOp.shape[1], dtype=self.SOp.dtype
)
# create variable to track residual
if monitorres:
self.normresold = np.inf
# for fista
self.t = 1.0
# create variables to track the residual norm and iterations
self.cost: List[float] = []
self.iiter = 0
# print setup
if show:
self._print_setup()
return x
def step(self, x: NDArray, show: bool = False) -> Tuple[NDArray, float]:
r"""Run one step of solver
Parameters
----------
x : :obj:`numpy.ndarray`
Current model vector to be updated by a step of ISTA
show : :obj:`bool`, optional
Display iteration log
Returns
-------
x : :obj:`numpy.ndarray`
Updated model vector
xupdate : :obj:`float`
Norm of the update
"""
# store old vector
if self.preallocate:
self.xold[:] = x[:]
else:
xold = x.copy()
# compute residual
if not self.preallocate:
res: NDArray = self.y - self.Opmatvec(x)
else:
self.ncp.subtract(self.y, self.Opmatvec(x), out=self.res)
if self.monitorres:
self.normres = np.linalg.norm(self.res if self.preallocate else res)
if self.normres > self.normresold:
raise ValueError(
f"ISTA stopped at iteration {self.iiter} due to "
"residual increasing, consider modifying "
"eps and/or alpha..."
)
else:
self.normresold = self.normres
# compute gradient
if not self.preallocate:
grad: NDArray = self.alpha * (self.Oprmatvec(res))
else:
self.ncp.multiply(
self.Oprmatvec(self.res),
self.alpha,
out=self.grad,
)
# update inverted model
if not self.preallocate:
x_unthesh: NDArray = x + grad
else:
self.ncp.add(
x,
self.grad,
out=self.x_unthesh,
)
# apply SOp.H to current x
if self.SOp is not None:
if self.preallocate:
self.SOpx_unthesh[:] = self.SOprmatvec(self.x_unthesh)
else:
SOpx_unthesh = self.SOprmatvec(x_unthesh)
# threshold current solution or current solution projected onto SOp.H space
if self.SOp is None:
x_unthesh_or_SOpx_unthesh = (
self.x_unthesh if self.preallocate else x_unthesh
)
else:
x_unthesh_or_SOpx_unthesh = (
self.SOpx_unthesh if self.preallocate else SOpx_unthesh
)
if self.perc is None:
x = self.threshf(
x_unthesh_or_SOpx_unthesh,
self.decay[self.iiter] * self.thresh,
)
else:
x = self.threshf(x_unthesh_or_SOpx_unthesh, 100 - self.perc)
# apply SOp to thresholded x
if self.SOp is not None:
x = self.SOpmatvec(x)
# compute model update norm
if not self.preallocate:
xupdate = np.linalg.norm(x - xold)
else:
self.ncp.subtract(
x,
self.xold,
out=self.xold,
)
xupdate = np.linalg.norm(self.xold)
# compute cost functions
costdata = 0.5 * np.linalg.norm(self.res if self.preallocate else res) ** 2
costreg = self.eps * np.linalg.norm(x, ord=1)
self.cost.append(float(costdata + costreg))
self.iiter += 1
if show:
self._print_step(x, costdata, costreg, xupdate)
return x, xupdate
def run(
self,
x: NDArray,
niter: Optional[int] = None,
show: bool = False,
itershow: Tuple[int, int, int] = (10, 10, 10),
) -> NDArray:
r"""Run solver
Parameters
----------
x : :obj:`numpy.ndarray`
Current model vector to be updated by multiple steps of CG
niter : :obj:`int`, optional
Number of iterations. Can be set to ``None`` if already
provided in the setup call
show : :obj:`bool`, optional
Display logs
itershow : :obj:`tuple`, optional
Display set log for the first N1 steps, last N2 steps,
and every N3 steps in between where N1, N2, N3 are the
three element of the list.
Returns
-------
x : :obj:`numpy.ndarray`
Estimated model of size :math:`[M \times 1]`
"""
xupdate = np.inf
niter = self.niter if niter is None else niter
if niter is None:
raise ValueError("niter must not be None")
while self.iiter < niter and xupdate > self.tol:
showstep = (
True
if show
and (
self.iiter < itershow[0]
or niter - self.iiter < itershow[1]
or self.iiter % itershow[2] == 0
)
else False
)
x, xupdate = self.step(x, showstep)
self.callback(x)
# check if any callback has raised a stop flag
stop = _callback_stop(self.callbacks)
if stop:
break
if xupdate <= self.tol:
logger.info("Update smaller that tolerance for iteration %d", self.iiter)
return x
def finalize(self, show: bool = False) -> None:
r"""Finalize solver
Parameters
----------
show : :obj:`bool`, optional
Display finalize log
"""
self.tend = time.time()
self.telapsed = self.tend - self.tstart
self.cost = np.array(self.cost)
if show:
self._print_finalize()
def solve(
self,
y: NDArray,
x0: Optional[NDArray] = None,
niter: Optional[int] = None,
SOp: Optional[LinearOperator] = None,
eps: float = 0.1,
alpha: Optional[float] = None,
eigsdict: Optional[Dict[str, Any]] = None,
tol: float = 1e-10,
threshkind: str = "soft",
perc: Optional[float] = None,
decay: Optional[NDArray] = None,
monitorres: bool = False,
preallocate: bool = False,
show: bool = False,
itershow: Tuple[int, int, int] = (10, 10, 10),
) -> Tuple[NDArray, int, NDArray]:
r"""Run entire solver
Parameters
----------
y : :obj:`numpy.ndarray`
Data of size :math:`[N \times 1]`
x0: :obj:`numpy.ndarray`, optional
Initial guess
niter : :obj:`int`
Number of iterations
SOp : :obj:`pylops.LinearOperator`, optional
Regularization operator (use when solving the analysis problem)
eps : :obj:`float`, optional
Sparsity damping
alpha : :obj:`float`, optional
Step size. To guarantee convergence, ensure
:math:`\alpha \le 1/\lambda_\text{max}`, where :math:`\lambda_\text{max}`
is the largest eigenvalue of :math:`\mathbf{Op}^H\mathbf{Op}`.
If ``None``, the maximum eigenvalue is estimated and the optimal step size
is chosen as :math:`1/\lambda_\text{max}`. If provided, the
convergence criterion will not be checked internally.
eigsdict : :obj:`dict`, optional
Dictionary of parameters to be passed to :func:`pylops.LinearOperator.eigs` method
when computing the maximum eigenvalue
tol : :obj:`float`, optional
Tolerance. Stop iterations if difference between inverted model
at subsequent iterations is smaller than ``tol``
threshkind : :obj:`str`, optional
Kind of thresholding ('hard', 'soft', 'half', 'hard-percentile',
'soft-percentile', or 'half-percentile' - 'soft' used as default)
perc : :obj:`float`, optional
Percentile, as percentage of values to be kept by thresholding (to be
provided when thresholding is soft-percentile or half-percentile)
decay : :obj:`numpy.ndarray`, optional
Decay factor to be applied to thresholding during iterations
monitorres : :obj:`bool`, optional
Monitor that residual is decreasing
preallocate : :obj:`bool`, optional
.. versionadded:: 2.6.0
Pre-allocate all variables used by the solver. Note that if ``y``
is a JAX array, this option is ignored and variables are not
pre-allocated since JAX does not support in-place operations.
show : :obj:`bool`, optional
Display logs
itershow : :obj:`tuple`, optional
Display set log for the first N1 steps, last N2 steps,
and every N3 steps in between where N1, N2, N3 are the
three element of the list.
Returns
-------
x : :obj:`numpy.ndarray`
Estimated model of size :math:`[M \times 1]`
niter : :obj:`int`
Number of effective iterations
cost : :obj:`numpy.ndarray`, optional
History of cost function
"""
x = self.setup(
y=y,
x0=x0,
niter=niter,
SOp=SOp,
eps=eps,
alpha=alpha,
eigsdict=eigsdict,
tol=tol,
threshkind=threshkind,
perc=perc,
decay=decay,
monitorres=monitorres,
preallocate=preallocate,
show=show,
)
x = self.run(x, niter, show=show, itershow=itershow)
self.finalize(show)
return x, self.iiter, self.cost
[docs]class FISTA(ISTA):
r"""Fast Iterative Shrinkage-Thresholding Algorithm (FISTA).
Solve an optimization problem with :math:`L_p, \; p=0, 0.5, 1`
regularization, given the operator ``Op`` and data ``y``.
The operator can be real or complex, and should ideally be either square
:math:`N=M` or underdetermined :math:`N<M`.
Parameters
----------
Op : :obj:`pylops.LinearOperator`
Operator to invert
Attributes
----------
ncp : :obj:`module`
Array module used by the solver (obtained via
:func:`pylops.utils.backend.get_array_module`)
). Available only after ``setup`` is called.
isjax : :obj:`bool`
Whether the input data is a JAX array or not.
Opmatvec : :obj:`callable`
Function handle to ``Op.matvec`` or ``Op.matmat``
depending on the number of dimensions of ``y``.
Oprmatvec : :obj:`callable`
Function handle to ``Op.rmatvec`` or ``Op.rmatmat``
depending on the number of dimensions of ``y``.
SOpmatvec : :obj:`callable`
Function handle to ``SOp.matvec`` or ``SOp.matmat``
depending on the number of dimensions of ``y``.
SOprmatvec : :obj:`callable`
Function handle to ``SOp.rmatvec`` or ``SOp.rmatmat``
depending on the number of dimensions of ``y``.
threshf : :obj:`callable`
Function handle to the chosen thresholding method.
thresh : :obj:`float`
Threshold.
res : :obj:`numpy.ndarray`
Residual vector of size :math:`[N \times 1]` used in the
solver when ``preallocate=True``. Available only after ``setup``
is called and updated at each call to ``step``.
grad : :obj:`numpy.ndarray`
Gradient vector of size :math:`[M \times 1]` used in the
solver when ``preallocate=True``. Available only after ``setup``
is called and updated at each call to ``step``.
x_unthesh : :obj:`numpy.ndarray`
Unthresholded model vector of size :math:`[M \times 1]` used in the
solver when ``preallocate=True``. Available only after ``setup``
is called and updated at each call to ``step``.
xold : :obj:`numpy.ndarray`
Old model vector of size :math:`[M \times 1]` used in the
solver when ``preallocate=True``. Available only after ``setup``
is called and updated at each call to ``step``.
SOpx_unthesh : :obj:`numpy.ndarray`
Old model vector pre-multiplied by the regularization operator
of size :math:`[M_S \times 1]` used in the solver when ``preallocate=True``.
Available only after ``setup`` is called and updated at each call to ``step``.
normresold : :obj:`float`
Old norm of the residual.
t : :obj:`float`
FISTA auxiliary coefficient (not used in ISTA).
cost : :obj:`list`
History of the L2 norm of the total objectiv function. Available
only after ``setup`` is called and updated at each call to ``step``.
iiter : :obj:`int`
Current iteration number. Available only after
``setup`` is called and updated at each call to ``step``.
Raises
------
NotImplementedError
If ``threshkind`` is different from hard, soft, half, soft-percentile,
or half-percentile
ValueError
If ``perc=None`` when ``threshkind`` is soft-percentile or
half-percentile
See Also
--------
OMP: Orthogonal Matching Pursuit (OMP).
ISTA: Iterative Shrinkage-Thresholding Algorithm (ISTA).
SPGL1: Spectral Projected-Gradient for L1 norm (SPGL1).
SplitBregman: Split Bregman for mixed L2-L1 norms.
Notes
-----
Solves the following synthesis problem for the operator
:math:`\mathbf{Op}` and the data :math:`\mathbf{y}`:
.. math::
J = \|\mathbf{y} - \mathbf{Op}\,\mathbf{x}\|_2^2 +
\epsilon \|\mathbf{x}\|_p
or the analysis problem:
.. math::
J = \|\mathbf{y} - \mathbf{Op}\,\mathbf{x}\|_2^2 +
\epsilon \|\mathbf{SOp}^H\,\mathbf{x}\|_p
if ``SOp`` is provided.
The Fast Iterative Shrinkage-Thresholding Algorithm (FISTA) [1]_ is used,
where :math:`p=0, 0.5, 1`. This is a modified version of ISTA solver with
improved convergence properties and limited additional computational cost.
Similarly to the ISTA solver, the choice of the thresholding algorithm to
apply at every iteration is based on the choice of :math:`p`.
.. [1] Beck, A., and Teboulle, M., “A Fast Iterative Shrinkage-Thresholding
Algorithm for Linear Inverse Problems”, SIAM Journal on
Imaging Sciences, vol. 2, pp. 183-202. 2009.
"""
def memory_usage(
self,
show: bool = False,
unit: str = "B",
) -> float:
"""Compute memory usage of the solver
Parameters
----------
show : :obj:`bool`, optional
Display memory usage
unit: :obj:`str`, optional
Unit used to display memory usage (
``B``, ``KB``, ``MB`` or ``GB``)
Returns
-------
memuse :obj:`float`
Memory usage in Bytes
"""
# Get number of bytes of dtype used in the solver
nbytes = np.dtype(self.Op.dtype).itemsize
# Setup: x0 - y
memuse = (self.Op.shape[1] + self.Op.shape[0]) * nbytes
# Step (additional variables to those in setup): xold, grad, x_unthesh, z - res
memuse += (4 * self.Op.shape[1] + self.Op.shape[0]) * nbytes
if show:
print(f"FISTA predicted memory usage: {memuse / _units[unit]:.2f} {unit}")
return memuse
def step(self, x: NDArray, z: NDArray, show: bool = False) -> NDArray:
r"""Run one step of solver
Parameters
----------
x : :obj:`numpy.ndarray`
Current model vector to be updated by a step of ISTA
z : :obj:`numpy.ndarray`
Current auxiliary model vector to be updated by a step of ISTA
show : :obj:`bool`, optional
Display iteration log
Returns
-------
x : :obj:`numpy.ndarray`
Updated model vector
z : :obj:`numpy.ndarray`
Updated auxiliary model vector
xupdate : :obj:`float`
Norm of the update
"""
# store old vector
if self.preallocate:
self.xold[:] = x[:]
else:
xold = x.copy()
# compute residual
if not self.preallocate:
res: NDArray = self.y - self.Opmatvec(z)
else:
self.ncp.subtract(self.y, self.Opmatvec(z), out=self.res)
if self.monitorres:
self.normres = np.linalg.norm(self.res if self.preallocate else res)
if self.normres > self.normresold:
raise ValueError(
f"FISTA stopped at iteration {self.iiter} due to "
"residual increasing, consider modifying "
"eps and/or alpha..."
)
else:
self.normresold = self.normres
# compute gradient and update inverted model
if not self.preallocate:
grad: NDArray = self.alpha * (self.Oprmatvec(res))
x_unthesh: NDArray = z + grad
else:
self.ncp.multiply(
self.Oprmatvec(self.res),
self.alpha,
out=self.grad,
)
self.ncp.add(
z,
self.grad,
out=self.x_unthesh,
)
# apply SOp.H to current x
if self.SOp is not None:
if self.preallocate:
self.SOpx_unthesh[:] = self.SOprmatvec(self.x_unthesh)
else:
SOpx_unthesh = self.SOprmatvec(x_unthesh)
# threshold current solution or current solution projected onto SOp.H space
if self.SOp is None:
x_unthesh_or_SOpx_unthesh = (
self.x_unthesh if self.preallocate else x_unthesh
)
else:
x_unthesh_or_SOpx_unthesh = (
self.SOpx_unthesh if self.preallocate else SOpx_unthesh
)
if self.perc is None:
x = self.threshf(
x_unthesh_or_SOpx_unthesh,
self.decay[self.iiter] * self.thresh,
)
else:
x = self.threshf(x_unthesh_or_SOpx_unthesh, 100 - self.perc)
# apply SOp to thresholded x
if self.SOp is not None:
x = self.SOpmatvec(x)
# update auxiliary coefficients
told = self.t
self.t = (1.0 + sqrt(1.0 + 4.0 * self.t**2)) / 2.0
# model update
if not self.preallocate:
z = x + ((told - 1.0) / self.t) * (x - xold)
else:
self.ncp.subtract(
x,
self.xold,
out=self.xold,
)
self.ncp.multiply(self.xold, ((told - 1.0) / self.t), out=z)
self.ncp.add(x, z, out=z)
# check model update
if not self.preallocate:
xupdate = np.linalg.norm(x - xold)
else:
# note that x - xold has been already computed as part of the
# intermediate calculation of x in model update step
xupdate = np.linalg.norm(self.xold)
# cost functions
costdata = 0.5 * np.linalg.norm(self.y - self.Op @ x) ** 2
costreg = self.eps * np.linalg.norm(x, ord=1)
self.cost.append(float(costdata + costreg))
self.iiter += 1
if show:
self._print_step(x, costdata, costreg, xupdate)
return x, z, xupdate
def run(
self,
x: NDArray,
niter: Optional[int] = None,
show: bool = False,
itershow: Tuple[int, int, int] = (10, 10, 10),
) -> NDArray:
r"""Run solver
Parameters
----------
x : :obj:`numpy.ndarray`
Current model vector to be updated by multiple steps of CG
niter : :obj:`int`, optional
Number of iterations. Can be set to ``None`` if already
provided in the setup call
show : :obj:`bool`, optional
Display logs
itershow : :obj:`tuple`, optional
Display set log for the first N1 steps, last N2 steps,
and every N3 steps in between where N1, N2, N3 are the
three element of the list.
Returns
-------
x : :obj:`numpy.ndarray`
Estimated model of size :math:`[M \times 1]`
"""
z = x.copy()
xupdate = np.inf
niter = self.niter if niter is None else niter
if niter is None:
raise ValueError("niter must not be None")
while self.iiter < niter and xupdate > self.tol:
showstep = (
True
if show
and (
self.iiter < itershow[0]
or niter - self.iiter < itershow[1]
or self.iiter % itershow[2] == 0
)
else False
)
x, z, xupdate = self.step(x, z, showstep)
self.callback(x)
# check if any callback has raised a stop flag
stop = _callback_stop(self.callbacks)
if stop:
break
if xupdate <= self.tol:
logger.warning(
"Update smaller that tolerance for " "iteration %d", self.iiter
)
return x
[docs]class SPGL1(Solver):
r"""Spectral Projected-Gradient for L1 norm.
Solve a constrained system of equations given the operator ``Op``
and a sparsyfing transform ``SOp`` aiming to retrive a model that
is sparse in the sparsifying domain.
This is a simple wrapper to :py:func:`spgl1.spgl1`
which is a porting of the well-known
`SPGL1 <https://www.cs.ubc.ca/~mpf/spgl1/>`_ MATLAB solver into Python.
In order to be able to use this solver you need to have installed the
``spgl1`` library.
Parameters
----------
Op : :obj:`pylops.LinearOperator`
Operator to invert of size :math:`[N \times M]`.
Attributes
----------
ncp : :obj:`module`
Array module used by the solver (obtained via
:func:`pylops.utils.backend.get_array_module`)
). Available only after ``setup`` is called.
Raises
------
ModuleNotFoundError
If the ``spgl1`` library is not installed
Notes
-----
Solve different variations of sparsity-promoting inverse problem by
imposing sparsity in the retrieved model [1]_.
The first problem is called *basis pursuit denoise (BPDN)* and
its cost function is
.. math::
\|\mathbf{x}\|_1 \quad \text{subject to} \quad
\left\|\mathbf{Op}\,\mathbf{S}^H\mathbf{x}-\mathbf{y}\right\|_2^2
\leq \sigma,
while the second problem is the *ℓ₁-regularized least-squares or LASSO*
problem and its cost function is
.. math::
\left\|\mathbf{Op}\,\mathbf{S}^H\mathbf{x}-\mathbf{y}\right\|_2^2
\quad \text{subject to} \quad \|\mathbf{x}\|_1 \leq \tau
.. [1] van den Berg E., Friedlander M.P., "Probing the Pareto frontier
for basis pursuit solutions", SIAM J. on Scientific Computing,
vol. 31(2), pp. 890-912. 2008.
"""
def _print_setup(self, xcomplex: bool = False) -> None:
self._print_solver()
strprec = f"SOp={self.SOp}"
strreg = f"tau={self.tau} sigma={self.sigma}"
print(strprec)
print(strreg)
print("-" * 80)
def _print_finalize(self) -> None:
print(f"\nTotal time (s) = {self.telapsed:.2f}")
print("-" * 80 + "\n")
def memory_usage(
self,
show: bool = False,
unit: str = "B",
) -> float:
pass
def setup(
self,
y: NDArray,
SOp: Optional[LinearOperator] = None,
tau: int = 0,
sigma: int = 0,
show: bool = False,
) -> None:
r"""Setup solver
Parameters
----------
y : :obj:`numpy.ndarray`
Data of size :math:`[N \times 1]`
SOp : :obj:`pylops.LinearOperator`, optional
Sparsifying transform
tau : :obj:`float`, optional
Non-negative LASSO scalar. If different from ``0``,
SPGL1 will solve LASSO problem
sigma : :obj:`list`, optional
BPDN scalar. If different from ``0``,
SPGL1 will solve BPDN problem
show : :obj:`bool`, optional
Display setup log
"""
if spgl1_message is not None:
raise ModuleNotFoundError(spgl1_message)
self.y = y
self.SOp = SOp
self.tau = tau
self.sigma = sigma
self.ncp = get_array_module(y)
# print setup
if show:
self._print_setup()
def step(self) -> None:
raise NotImplementedError(
"SPGL1 uses as default the"
"spgl1.spgl1 solver, therefore the "
"step method is not implemented. Use directly run or solve."
)
def run(
self,
x: NDArray,
show: bool = False,
**kwargs_spgl1,
) -> Tuple[NDArray, NDArray, Dict[str, Any]]:
r"""Run solver
Parameters
----------
x : :obj:`numpy.ndarray`
Current model vector to be updated by multiple steps of the solver.
If ``None``, x is assumed to be a zero vector
show : :obj:`bool`, optional
Display iterations log
**kwargs_spgl1
Arbitrary keyword arguments for
:py:func:`spgl1.spgl1` solver
Returns
-------
xinv : :obj:`numpy.ndarray`
Inverted model in original domain.
pinv : :obj:`numpy.ndarray`
Inverted model in sparse domain.
info : :obj:`dict`
Dictionary with the following information:
- ``tau``, final value of tau (see sigma above)
- ``rnorm``, two-norm of the optimal residual
- ``rgap``, relative duality gap (an optimality measure)
- ``gnorm``, Lagrange multiplier of (LASSO)
- ``stat``, final status of solver
* ``1``: found a BPDN solution,
* ``2``: found a BP solution; exit based on small gradient,
* ``3``: found a BP solution; exit based on small residual,
* ``4``: found a LASSO solution,
* ``5``: error, too many iterations,
* ``6``: error, linesearch failed,
* ``7``: error, found suboptimal BP solution,
* ``8``: error, too many matrix-vector products.
- ``niters``, number of iterations
- ``nProdA``, number of multiplications with A
- ``nProdAt``, number of multiplications with A'
- ``n_newton``, number of Newton steps
- ``time_project``, projection time (seconds)
- ``time_matprod``, matrix-vector multiplications time (seconds)
- ``time_total``, total solution time (seconds)
- ``niters_lsqr``, number of lsqr iterations (if ``subspace_min=True``)
- ``xnorm1``, L1-norm model solution history through iterations
- ``rnorm2``, L2-norm residual history through iterations
- ``lambdaa``, Lagrange multiplier history through iterations
"""
pinv, _, _, info = ext_spgl1(
self.Op if self.SOp is None else self.Op @ self.SOp.H,
self.y,
tau=self.tau,
sigma=self.sigma,
x0=x,
**kwargs_spgl1,
)
xinv = pinv.copy() if self.SOp is None else self.SOp.rmatvec(pinv)
return xinv, pinv, info
def solve(
self,
y: NDArray,
x0: Optional[NDArray] = None,
SOp: Optional[LinearOperator] = None,
tau: float = 0.0,
sigma: float = 0,
show: bool = False,
**kwargs_spgl1,
) -> Tuple[NDArray, NDArray, Dict[str, Any]]:
r"""Run entire solver
Parameters
----------
y : :obj:`numpy.ndarray`
Data of size :math:`[N \times 1]`
x0 : :obj:`numpy.ndarray`, optional
Initial guess
SOp : :obj:`pylops.LinearOperator`, optional
Sparsifying transform
tau : :obj:`float`, optional
Non-negative LASSO scalar. If different from ``0``,
SPGL1 will solve LASSO problem
sigma : :obj:`list`, optional
BPDN scalar. If different from ``0``,
SPGL1 will solve BPDN problem
show : :obj:`bool`, optional
Display log
**kwargs_spgl1
Arbitrary keyword arguments for
:py:func:`spgl1.spgl1` solver
Returns
-------
xinv : :obj:`numpy.ndarray`
Inverted model in original domain.
pinv : :obj:`numpy.ndarray`
Inverted model in sparse domain.
info : :obj:`dict`
Dictionary with the following information:
- ``tau``, final value of tau (see sigma above)
- ``rnorm``, two-norm of the optimal residual
- ``rgap``, relative duality gap (an optimality measure)
- ``gnorm``, Lagrange multiplier of (LASSO)
- ``stat``, final status of solver
* ``1``: found a BPDN solution,
* ``2``: found a BP solution; exit based on small gradient,
* ``3``: found a BP solution; exit based on small residual,
* ``4``: found a LASSO solution,
* ``5``: error, too many iterations,
* ``6``: error, linesearch failed,
* ``7``: error, found suboptimal BP solution,
* ``8``: error, too many matrix-vector products.
- ``niters``, number of iterations
- ``nProdA``, number of multiplications with A
- ``nProdAt``, number of multiplications with A'
- ``n_newton``, number of Newton steps
- ``time_project``, projection time (seconds)
- ``time_matprod``, matrix-vector multiplications time (seconds)
- ``time_total``, total solution time (seconds)
- ``niters_lsqr``, number of lsqr iterations (if ``subspace_min=True``)
- ``xnorm1``, L1-norm model solution history through iterations
- ``rnorm2``, L2-norm residual history through iterations
- ``lambdaa``, Lagrange multiplier history through iterations
"""
self.setup(y=y, SOp=SOp, tau=tau, sigma=sigma, show=show)
xinv, pinv, info = self.run(x0, show=show, **kwargs_spgl1)
self.finalize(show)
return xinv, pinv, info
[docs]class SplitBregman(Solver):
r"""Split Bregman for mixed L2-L1 norms.
Solve an unconstrained system of equations with mixed :math:`L_2` and :math:`L_1`
regularization terms given the operator ``Op``, a list of :math:`L_1`
regularization terms ``RegsL1``, and an optional list of :math:`L_2`
regularization terms ``RegsL2``.
Parameters
----------
Op : :obj:`pylops.LinearOperator`
Operator to invert
Attributes
----------
ncp : :obj:`module`
Array module used by the solver (obtained via
:func:`pylops.utils.backend.get_array_module`)
). Available only after ``setup`` is called.
isjax : :obj:`bool`
Whether the input data is a JAX array or not.
nregsL1 : :obj:`int`
Number of L1 regularization terms.
b : :obj:`numpy.ndarray`
Bregman update vector.
d : :obj:`numpy.ndarray`
Shrinked vector.
nregsL1 : :obj:`int`
Number of L2 regularization terms.
Regs : :obj:`list`
List of L1 and L2 regularization terms.
epsRs : :obj:`list`
List of L1 and L2 regularization dampings.
cost : :obj:`numpy.ndarray`, optional
History of total cost function through iterations.
iiter : :obj:`int`
Current iteration number. Available only after
``setup`` is called and updated at each call to ``step``.
Notes
-----
Solve the following system of unconstrained, regularized equations
given the operator :math:`\mathbf{Op}` and a set of mixed norm
(:math:`L^2` and :math:`L_1`)
regularization terms :math:`\mathbf{R}_{2,i}` and
:math:`\mathbf{R}_{1,i}`, respectively:
.. math::
J = \frac{\mu}{2} \|\textbf{y} - \textbf{Op}\,\textbf{x} \|_2^2 +
\frac{1}{2}\sum_i \epsilon_{\mathbf{R}_{2,i}} \|\mathbf{y}_{\mathbf{R}_{2,i}} -
\mathbf{R}_{2,i} \textbf{x} \|_2^2 +
\sum_i \epsilon_{\mathbf{R}_{1,i}} \| \mathbf{R}_{1,i} \textbf{x} \|_1
where :math:`\mu` is the reconstruction damping, :math:`\epsilon_{\mathbf{R}_{2,i}}`
are the damping factors used to weight the different :math:`L^2` regularization
terms of the cost function and :math:`\epsilon_{\mathbf{R}_{1,i}}`
are the damping factors used to weight the different :math:`L_1` regularization
terms of the cost function.
The generalized Split-Bergman algorithm [1]_ is used to solve such cost
function: the algorithm is composed of a sequence of unconstrained
inverse problems and Bregman updates.
The original system of equations is initially converted into a constrained
problem:
.. math::
J = \frac{\mu}{2} \|\textbf{y} - \textbf{Op}\,\textbf{x}\|_2^2 +
\frac{1}{2}\sum_i \epsilon_{\mathbf{R}_{2,i}} \|\mathbf{y}_{\mathbf{R}_{2,i}} -
\mathbf{R}_{2,i} \textbf{x}\|_2^2 +
\sum_i \| \textbf{y}_i \|_1 \quad \text{subject to} \quad
\textbf{y}_i = \mathbf{R}_{1,i} \textbf{x} \quad \forall i
and solved as follows:
.. math::
\DeclareMathOperator*{\argmin}{arg\,min}
\begin{align}
(\textbf{x}^{k+1}, \textbf{y}_i^{k+1}) =
\argmin_{\mathbf{x}, \mathbf{y}_i}
\|\textbf{y} - \textbf{Op}\,\textbf{x}\|_2^2
&+ \frac{1}{2}\sum_i \epsilon_{\mathbf{R}_{2,i}} \|\mathbf{y}_{\mathbf{R}_{2,i}} -
\mathbf{R}_{2,i} \textbf{x}\|_2^2 \\
&+ \frac{1}{2}\sum_i \epsilon_{\mathbf{R}_{1,i}} \|\textbf{y}_i -
\mathbf{R}_{1,i} \textbf{x} - \textbf{b}_i^k\|_2^2 \\
&+ \sum_i \| \textbf{y}_i \|_1
\end{align}
.. math::
\textbf{b}_i^{k+1}=\textbf{b}_i^k +
(\mathbf{R}_{1,i} \textbf{x}^{k+1} - \textbf{y}^{k+1})
The :py:func:`scipy.sparse.linalg.lsqr` solver and a fast shrinkage
algorithm are used within a inner loop to solve the first step. The entire
procedure is repeated ``niter_outer`` times until convergence.
.. [1] Goldstein T. and Osher S., "The Split Bregman Method for
L1-Regularized Problems", SIAM J. on Scientific Computing, vol. 2(2),
pp. 323-343. 2008.
"""
def _print_setup(self, xcomplex: bool = False) -> None:
self._print_solver(nbar=65)
strpar = (
f"niter_outer = {self.niter_outer:3d} niter_inner = {self.niter_inner:3d} tol = {self.tol:2.2e}\n"
f"mu = {self.mu:2.2e} epsL1 = {self.epsRL1s}\t epsL2 = {self.epsRL2s}"
)
print(strpar)
print("-" * 65)
if not xcomplex:
head1 = " Itn x[0] r2norm r12norm"
else:
head1 = " Itn x[0] r2norm r12norm"
print(head1)
def _print_step(self, x: NDArray) -> None:
strx = f"{x[0]:1.2e} " if np.iscomplexobj(x) else f"{x[0]:11.4e} "
str1 = f"{self.iiter:6g} " + strx
str2 = f"{self.costdata:10.3e} {self.costtot:9.3e}"
print(str1 + str2)
def memory_usage(
self,
nopRegsL1: Optional[Tuple[int]] = None,
nopRegsL2: Optional[Tuple[int]] = None,
show: bool = False,
unit: str = "B",
) -> float:
"""Compute memory usage of the solver
Parameters
----------
nopRegsL1 : :obj:`tuple`, optional
Number of data elements of ``RegsL1`` operators
nopRegsL2 : :obj:`tuple`, optional
Number of data elements of ``RegsL2`` operators
show : :obj:`bool`, optional
Display memory usage
unit: :obj:`str`, optional
Unit used to display memory usage (
``B``, ``KB``, ``MB`` or ``GB``)
Returns
-------
memuse :obj:`float`
Memory usage in Bytes
"""
# Convert nopRegsL1 and nopRegsL2 if None
if nopRegsL1 is None:
nopRegsL1 = 0
if nopRegsL2 is None:
nopRegsL2 = 0
# Get number of bytes of dtype used in the solver
nbytes = np.dtype(self.Op.dtype).itemsize
# Setup: x0 - y - b, d dataregsL1 - dataregsL2
memuse = (
self.Op.shape[1]
+ self.Op.shape[0]
+ 3 * np.prod(nopRegsL1)
+ np.prod(nopRegsL2)
) * nbytes
# Step (additional variables to those in setup): dataregs
memuse += np.prod(nopRegsL1) * nbytes
if show:
print(
f"Split-Bregman predicted memory usage: {memuse / _units[unit]:.2f} {unit}"
)
return memuse
def setup(
self,
y: NDArray,
RegsL1: List[LinearOperator],
x0: Optional[NDArray] = None,
niter_outer: int = 3,
niter_inner: int = 5,
RegsL2: Optional[List[LinearOperator]] = None,
dataregsL2: Optional[Sequence[NDArray]] = None,
mu: float = 1.0,
epsRL1s: Optional[SamplingLike] = None,
epsRL2s: Optional[SamplingLike] = None,
tol: float = 1e-10,
tau: float = 1.0,
restart: bool = False,
preallocate: bool = False,
show: bool = False,
) -> NDArray:
r"""Setup solver
Parameters
----------
y : :obj:`numpy.ndarray`
Data of size :math:`[N \times 1]`
RegsL1 : :obj:`list`
:math:`L_1` regularization operators
x0 : :obj:`numpy.ndarray`, optional
Initial guess of size :math:`[M \times 1]`. If ``None``, initialize
internally as zero vector
niter_outer : :obj:`int`, optional
Number of iterations of outer loop
niter_inner : :obj:`int`, optional
Number of iterations of inner loop of first step of the Split Bregman
algorithm. A small number of iterations is generally sufficient and
for many applications optimal efficiency is obtained when only one
iteration is performed.
RegsL2 : :obj:`list`, optional
Additional :math:`L^2` regularization operators
(if ``None``, :math:`L^2` regularization is not added to the problem)
dataregsL2 : :obj:`list`, optional
:math:`L^2` Regularization data (must have the same number of elements
of ``RegsL2`` or equal to ``None`` to use a zero data for every
regularization operator in ``RegsL2``)
mu : :obj:`float`, optional
Data term damping
epsRL1s : :obj:`list`
:math:`L_1` Regularization dampings (must have the same number of elements
as ``RegsL1``)
epsRL2s : :obj:`list`
:math:`L^2` Regularization dampings (must have the same number of elements
as ``RegsL2``)
tol : :obj:`float`, optional
Tolerance. Stop outer iterations if difference between inverted model
at subsequent iterations is smaller than ``tol``
tau : :obj:`float`, optional
Scaling factor in the Bregman update (must be close to 1)
restart : :obj:`bool`, optional
Initialize the unconstrained inverse problem in inner loop with
the initial guess (``True``) or with the last estimate (``False``).
Note that when this is set to ``True``, the ``x0`` provided in the setup will
be used in all iterations.
preallocate : :obj:`bool`, optional
.. versionadded:: 2.6.0
Pre-allocate all variables used by the solver. Note that if ``y``
is a JAX array, this option is ignored and variables are not
pre-allocated since JAX does not support in-place operations.
show : :obj:`bool`, optional
Display setup log
Returns
-------
x : :obj:`numpy.ndarray`
Initial guess of size :math:`[N \times 1]`
"""
self.y = y
self.RegsL1 = RegsL1
self.niter_outer = niter_outer
self.niter_inner = niter_inner
self.RegsL2 = RegsL2
self.dataregsL2 = list(dataregsL2) if dataregsL2 is not None else []
self.mu = mu
self.epsRL1s = list(epsRL1s) if epsRL1s is not None else []
self.epsRL2s = list(epsRL2s) if epsRL2s is not None else []
self.tol = tol
self.tau = tau
self.restart = restart
self.ncp = get_array_module(y)
self.isjax = get_module_name(self.ncp) == "jax"
self._setpreallocate(preallocate)
# L1 regularizations
self.nregsL1 = len(RegsL1)
self.b = [
self.ncp.zeros(RegL1.shape[0], dtype=self.Op.dtype) for RegL1 in RegsL1
]
self.d = [
self.ncp.zeros(RegL1.shape[0], dtype=self.Op.dtype) for RegL1 in RegsL1
]
# L2 regularizations
self.nregsL2 = 0 if RegsL2 is None else len(RegsL2)
if self.nregsL2 > 0 and RegsL2 is not None:
self.Regs = RegsL2 + RegsL1
if dataregsL2 is None:
self.dataregsL2 = [
self.ncp.zeros(Reg.shape[0], dtype=self.Op.dtype) for Reg in RegsL2
]
else:
self.Regs = RegsL1
self.dataregsL2 = []
# Rescale dampings
self.epsRs: List[float] = []
if epsRL2s is not None:
self.epsRs += [
sqrt(epsRL2s[ireg] / 2) / sqrt(mu / 2) for ireg in range(self.nregsL2)
]
if epsRL1s is not None:
self.epsRs += [
sqrt(epsRL1s[ireg] / 2) / sqrt(mu / 2) for ireg in range(self.nregsL1)
]
self.x0 = x0
x = self.ncp.zeros(self.Op.shape[1], dtype=self.Op.dtype) if x0 is None else x0
# create variables to track the residual norm and iterations
self.cost: List[float] = []
self.iiter = 0
if show:
self._print_setup(np.iscomplexobj(x))
return x
def step(
self,
x: NDArray,
engine: str = "scipy",
show: bool = False,
show_inner: bool = False,
**kwargs_solver,
) -> NDArray:
r"""Run one step of solver
Parameters
----------
x : :obj:`list` or :obj:`numpy.ndarray`
Current model vector to be updated by a step of OMP
engine : :obj:`str`, optional
Solver to use (``scipy`` or ``pylops``)
show : :obj:`bool`, optional
Display iteration log
show_inner : :obj:`bool`, optional
Display inner iteration logs of lsqr
**kwargs_solver
Arbitrary keyword arguments for chosen solver
used to solve the first subproblem in the first step of the
Split Bregman algorithm (:py:func:`scipy.sparse.linalg.lsqr` and
:py:func:`pylops.optimization.solver.cgls` are used as default
for numpy and cupy `data`, respectively).
Returns
-------
x : :obj:`numpy.ndarray`
Updated model vector
"""
# add preallocate to keywords of solver
if self.preallocate and (engine == "pylops" or self.ncp != np):
kwargs_solver["preallocate"] = True
for _ in range(self.niter_inner):
# regularized problem
if not self.preallocate:
dataregs = self.dataregsL2 + [
self.d[ireg] - self.b[ireg] for ireg in range(self.nregsL1)
]
else:
for ireg in range(self.nregsL1):
self.ncp.subtract(self.d[ireg], self.b[ireg], out=self.d[ireg])
dataregs = self.dataregsL2 + [
self.d[ireg] for ireg in range(self.nregsL1)
]
x = regularized_inversion(
self.Op,
self.y,
self.Regs,
dataregs=dataregs,
epsRs=self.epsRs,
x0=self.x0 if self.restart else x,
show=show_inner,
engine=engine,
**kwargs_solver,
)[0]
# shrinkage
if not self.preallocate:
for ireg in range(self.nregsL1):
self.d[ireg] = _softthreshold(
self.RegsL1[ireg].matvec(x) + self.b[ireg], self.epsRL1s[ireg]
)
else:
for ireg in range(self.nregsL1):
self.ncp.add(
self.RegsL1[ireg].matvec(x), self.b[ireg], out=self.d[ireg]
)
self.d[ireg] = _softthreshold(self.d[ireg], self.epsRL1s[ireg])
# Bregman update
for ireg in range(self.nregsL1):
self.b[ireg] += self.tau * (self.RegsL1[ireg].matvec(x) - self.d[ireg])
# compute residual norms
self.costdata = (
self.mu / 2.0 * self.ncp.linalg.norm(self.y - self.Op.matvec(x)) ** 2
)
self.costregL2 = (
0
if self.RegsL2 is None
else [
epsRL2 * self.ncp.linalg.norm(dataregL2 - RegL2.matvec(x)) ** 2
for epsRL2, RegL2, dataregL2 in zip(
self.epsRL2s, self.RegsL2, self.dataregsL2
)
]
)
self.costregL1 = [
self.ncp.linalg.norm(RegL1.matvec(x), ord=1)
for _, RegL1 in zip(self.epsRL1s, self.RegsL1)
]
self.costtot = (
self.costdata
+ self.ncp.sum(self.ncp.array(self.costregL2))
+ self.ncp.sum(self.ncp.array(self.costregL1))
)
# update history parameters
self.iiter += 1
self.cost.append(float(self.costtot))
if show:
self._print_step(x)
return x
def run(
self,
x: NDArray,
engine: str = "scipy",
show: bool = False,
itershow: Tuple[int, int, int] = (10, 10, 10),
show_inner: bool = False,
**kwargs_lsqr,
) -> NDArray:
r"""Run solver
Parameters
----------
x : :obj:`numpy.ndarray`
Current model vector to be updated by multiple steps of IRLS
engine : :obj:`str`, optional
Solver to use (``scipy`` or ``pylops``)
show : :obj:`bool`, optional
Display logs
itershow : :obj:`tuple`, optional
Display set log for the first N1 steps, last N2 steps,
and every N3 steps in between where N1, N2, N3 are the
three element of the list.
show_inner : :obj:`bool`, optional
Display inner iteration logs of lsqr
**kwargs_lsqr
Arbitrary keyword arguments for
:py:func:`scipy.sparse.linalg.lsqr` solver used to solve the first
subproblem in the first step of the Split Bregman algorithm.
Returns
-------
x : :obj:`numpy.ndarray`
Estimated model of size :math:`[M \times 1]`
"""
xold = x.copy() + 1.1 * self.tol
while (
self.ncp.linalg.norm(x - xold) > self.tol and self.iiter < self.niter_outer
):
xold = x.copy()
showstep = (
True
if show
and (
self.iiter < itershow[0]
or self.niter_outer - self.iiter < itershow[1]
or self.iiter % itershow[2] == 0
)
else False
)
x = self.step(x, engine, showstep, show_inner, **kwargs_lsqr)
self.callback(x)
# check if any callback has raised a stop flag
stop = _callback_stop(self.callbacks)
if stop:
break
return x
def finalize(self, show: bool = False) -> NDArray:
r"""Finalize solver
Parameters
----------
show : :obj:`bool`, optional
Display finalize log
Returns
-------
xfin : :obj:`numpy.ndarray`
Estimated model of size :math:`[M \times 1]`
"""
self.tend = time.time()
self.telapsed = self.tend - self.tstart
self.cost = np.array(self.cost)
if show:
self._print_finalize(nbar=65)
def solve(
self,
y: NDArray,
RegsL1: List[LinearOperator],
x0: Optional[NDArray] = None,
niter_outer: int = 3,
niter_inner: int = 5,
RegsL2: Optional[List[LinearOperator]] = None,
dataregsL2: Optional[List[NDArray]] = None,
mu: float = 1.0,
epsRL1s: Optional[SamplingLike] = None,
epsRL2s: Optional[SamplingLike] = None,
tol: float = 1e-10,
tau: float = 1.0,
restart: bool = False,
engine: str = "scipy",
preallocate: bool = False,
show: bool = False,
itershow: Tuple[int, int, int] = (10, 10, 10),
show_inner: bool = False,
**kwargs_lsqr,
) -> Tuple[NDArray, int, NDArray]:
r"""Run entire solver
Parameters
----------
y : :obj:`numpy.ndarray`
Data of size :math:`[N \times 1]`
RegsL1 : :obj:`list`
:math:`L_1` regularization operators
x0 : :obj:`numpy.ndarray`, optional
Initial guess of size :math:`[M \times 1]`. If ``None``, initialize
internally as zero vector
niter_outer : :obj:`int`, optional
Number of iterations of outer loop
niter_inner : :obj:`int`, optional
Number of iterations of inner loop of first step of the Split Bregman
algorithm. A small number of iterations is generally sufficient and
for many applications optimal efficiency is obtained when only one
iteration is performed.
RegsL2 : :obj:`list`, optional
Additional :math:`L^2` regularization operators
(if ``None``, :math:`L^2` regularization is not added to the problem)
dataregsL2 : :obj:`list`, optional
:math:`L^2` Regularization data (must have the same number of elements
of ``RegsL2`` or equal to ``None`` to use a zero data for every
regularization operator in ``RegsL2``)
mu : :obj:`float`, optional
Data term damping
epsRL1s : :obj:`list`
:math:`L_1` Regularization dampings (must have the same number of elements
as ``RegsL1``)
epsRL2s : :obj:`list`
:math:`L^2` Regularization dampings (must have the same number of elements
as ``RegsL2``)
tol : :obj:`float`, optional
Tolerance. Stop outer iterations if difference between inverted model
at subsequent iterations is smaller than ``tol``
tau : :obj:`float`, optional
Scaling factor in the Bregman update (must be close to 1)
restart : :obj:`bool`, optional
Initialize the unconstrained inverse problem in inner loop with
the initial guess (``True``) or with the last estimate (``False``).
Note that when this is set to ``True``, the ``x0`` provided in the setup will
be used in all iterations.
engine : :obj:`str`, optional
Solver to use (``scipy`` or ``pylops``)
preallocate : :obj:`bool`, optional
.. versionadded:: 2.6.0
Pre-allocate all variables used by the solver. Note that if ``y``
is a JAX array, this option is ignored and variables are not
pre-allocated since JAX does not support in-place operations.
show : :obj:`bool`, optional
Display logs
itershow : :obj:`tuple`, optional
Display set log for the first N1 steps, last N2 steps,
and every N3 steps in between where N1, N2, N3 are the
three element of the list.
show_inner : :obj:`bool`, optional
Display inner iteration logs of lsqr
**kwargs_lsqr
Arbitrary keyword arguments for
:py:func:`scipy.sparse.linalg.lsqr` solver used to solve the first
subproblem in the first step of the Split Bregman algorithm.
Returns
-------
x : :obj:`numpy.ndarray`
Estimated model of size :math:`[M \times 1]`
iiter : :obj:`int`
Iteration number of outer loop upon termination
cost : :obj:`numpy.ndarray`
History of total cost function through iterations
"""
x = self.setup(
y,
RegsL1,
x0=x0,
niter_outer=niter_outer,
niter_inner=niter_inner,
RegsL2=RegsL2,
dataregsL2=dataregsL2,
mu=mu,
epsRL1s=epsRL1s,
epsRL2s=epsRL2s,
tol=tol,
tau=tau,
restart=restart,
preallocate=preallocate,
show=show,
)
x = self.run(
x,
engine=engine,
show=show,
itershow=itershow,
show_inner=show_inner,
**kwargs_lsqr,
)
self.finalize(show)
return x, self.iiter, self.cost