__all__ = [
"NonStationaryConvolve1D",
"NonStationaryFilters1D",
]
from typing import Union
import numpy as np
from pylops import LinearOperator
from pylops.utils._internal import _value_or_sized_to_tuple
from pylops.utils.backend import get_array_module, inplace_add, inplace_set
from pylops.utils.decorators import reshaped
from pylops.utils.typing import DTypeLike, InputDimsLike, NDArray
[docs]class NonStationaryConvolve1D(LinearOperator):
r"""1D non-stationary convolution operator.
Apply non-stationary one-dimensional convolution. A varying compact filter
is provided on a coarser grid and on-the-fly interpolation is applied
in forward and adjoint modes.
Parameters
----------
dims : :obj:`list` or :obj:`int`
Number of samples for each dimension
hs : :obj:`numpy.ndarray`
Bank of 1d compact filters of size :math:`n_\text{filts} \times n_h`.
Filters must have odd number of samples and are assumed to be
centered in the middle of the filter support.
ih : :obj:`tuple`
Indices of the locations of the filters ``hs`` in the model (and data). Note
that the filters must be regularly sampled, i.e. :math:`dh=\text{diff}(ih)=\text{const.}`
axis : :obj:`int`, optional
Axis along which convolution is applied
dtype : :obj:`str`, optional
Type of elements in input array.
name : :obj:`str`, optional
Name of operator (to be used by :func:`pylops.utils.describe.describe`)
Attributes
----------
shape : :obj:`tuple`
Operator shape
explicit : :obj:`bool`
Operator contains a matrix that can be solved explicitly (``True``) or
not (``False``)
Raises
------
ValueError
If filters ``hs`` have even size
ValueError
If ``ih`` is not regularly sampled
ValueError
If ``ih`` is outside the bounds defined by ``dims[axis]``
Notes
-----
The NonStationaryConvolve1D operator applies non-stationary
one-dimensional convolution between the input signal :math:`d(t)`
and a bank of compact filter kernels :math:`h(t; t_i)`. Assuming
an input signal composed of :math:`N=5` samples, and filters at locations
:math:`t_1` and :math:`t_3`, the forward operator can be represented as follows:
.. math::
\mathbf{y} =
\begin{bmatrix}
\hat{h}_{0,0} & h_{1,0} & \hat{h}_{2,0} & h_{3,0} & \hat{h}_{4,0} \\
\hat{h}_{0,1} & h_{1,1} & \hat{h}_{2,1} & h_{3,1} & \hat{h}_{4,1} \\
\vdots & \vdots & \vdots & \vdots & \vdots \\
\hat{h}_{0,4} & h_{1,4} & \hat{h}_{2,4} & h_{3,4} & \hat{h}_{4,4} \\
\end{bmatrix}
\begin{bmatrix}
x_0 \\ x_1 \\ \vdots \\ x_4
\end{bmatrix}
where :math:`\mathbf{h}_1 = [h_{1,0}, h_{1,1}, \ldots, h_{1,N}]` and
:math:`\mathbf{h}_3 = [h_{3,0}, h_{3,1}, \ldots, h_{3,N}]` are the provided filter,
:math:`\hat{\mathbf{h}}_0 = \mathbf{h}_1` and :math:`\hat{\mathbf{h}}_4 = \mathbf{h}_3` are the
filters outside the range of the provided filters (which are extrapolated to be the same as
the nearest provided filter) and :math:`\hat{\mathbf{h}}_2 = 0.5 \mathbf{h}_1 + 0.5 \mathbf{h}_3`
is the filter within the range of the provided filters (which is linearly interpolated from the two nearest
provided filter on either side of its location).
In forward mode, each filter is weighted by the corresponding input and spread over the output.
In adjoint mode, the corresponding filter is element-wise multiplied with the input, all values
are summed together and put in the output.
"""
def __init__(
self,
dims: Union[int, InputDimsLike],
hs: NDArray,
ih: InputDimsLike,
axis: int = -1,
dtype: DTypeLike = "float64",
name: str = "C",
) -> None:
if hs.shape[1] % 2 == 0:
raise ValueError("filters hs must have odd length")
if len(np.unique(np.diff(ih))) > 1:
raise ValueError(
"the indices of filters 'ih' are must be regularly sampled"
)
dims = _value_or_sized_to_tuple(dims)
if min(ih) < 0 or max(ih) >= dims[axis]:
raise ValueError(
"the indices of filters 'ih' must be larger than 0 and smaller than `dims`"
)
self.hs = hs
self.hsize = hs.shape[1]
self.oh, self.dh, self.nh, self.eh = ih[0], ih[1] - ih[0], len(ih), ih[-1]
self.axis = axis
super().__init__(dtype=np.dtype(dtype), dims=dims, dimsd=dims, name=name)
@property
def hsinterp(self):
ncp = get_array_module(self.hs)
self._hsinterp = ncp.empty((self.dims[self.axis], self.hsize), dtype=self.dtype)
for ix in range(self.dims[self.axis]):
self._hsinterp[ix] = self._interpolate_h(
self.hs, ix, self.oh, self.dh, self.nh
)
return self._hsinterp
@hsinterp.deleter
def hsinterp(self):
del self._hsinterp
@staticmethod
def _interpolate_h(hs, ix, oh, dh, nh):
"""find closest filters and linearly interpolate between them and interpolate psf"""
ih_closest = int(np.floor((ix - oh) / dh))
if ih_closest < 0:
h = hs[0]
elif ih_closest >= nh - 1:
h = hs[nh - 1]
else:
dh_closest = (ix - oh) / dh - ih_closest
h = (1 - dh_closest) * hs[ih_closest] + dh_closest * hs[ih_closest + 1]
return h
@reshaped(swapaxis=True)
def _matvec(self, x: NDArray) -> NDArray:
ncp = get_array_module(x)
y = ncp.zeros_like(x)
for ix in range(self.dims[self.axis]):
h = self._interpolate_h(self.hs, ix, self.oh, self.dh, self.nh)
xextremes = (
max(0, ix - self.hsize // 2),
min(ix + self.hsize // 2 + 1, self.dims[self.axis]),
)
hextremes = (
max(0, -ix + self.hsize // 2),
min(self.hsize, self.hsize // 2 + (self.dims[self.axis] - ix)),
)
# y[..., xextremes[0] : xextremes[1]] += (
# x[..., ix : ix + 1] * h[hextremes[0] : hextremes[1]]
# )
sl = tuple(
[slice(None, None)] * (len(self.dimsd) - 1)
+ [slice(xextremes[0], xextremes[1])]
)
y = inplace_add(x[..., ix : ix + 1] * h[hextremes[0] : hextremes[1]], y, sl)
return y
@reshaped(swapaxis=True)
def _rmatvec(self, x: NDArray) -> NDArray:
ncp = get_array_module(x)
y = ncp.zeros_like(x)
for ix in range(self.dims[self.axis]):
h = self._interpolate_h(self.hs, ix, self.oh, self.dh, self.nh)
xextremes = (
max(0, ix - self.hsize // 2),
min(ix + self.hsize // 2 + 1, self.dims[self.axis]),
)
hextremes = (
max(0, -ix + self.hsize // 2),
min(self.hsize, self.hsize // 2 + (self.dims[self.axis] - ix)),
)
# y[..., ix] = ncp.sum(
# h[hextremes[0] : hextremes[1]] * x[..., xextremes[0] : xextremes[1]],
# axis=-1,
# )
sl = tuple([slice(None, None)] * (len(self.dimsd) - 1) + [ix])
y = inplace_set(
ncp.sum(
h[hextremes[0] : hextremes[1]]
* x[..., xextremes[0] : xextremes[1]],
axis=-1,
),
y,
sl,
)
return y
def todense(self):
ncp = get_array_module(self.hsinterp[0])
hs = self.hsinterp
H = ncp.array(
[
ncp.roll(ncp.pad(h, (0, self.dims[self.axis])), ix)
for ix, h in enumerate(hs)
]
)
H = H[:, int(self.hsize // 2) : -int(self.hsize // 2) - 1]
return H
[docs]class NonStationaryFilters1D(LinearOperator):
r"""1D non-stationary filter estimation operator.
Estimate a non-stationary one-dimensional filter by non-stationary convolution.
In forward mode, a varying compact filter on a coarser grid is on-the-fly linearly interpolated prior
to being convolved with a fixed input signal. In adjoint mode, the output signal is first weighted by the
fixed input signal and then spread across multiple filters (i.e., adjoint of linear interpolation).
Parameters
----------
inp : :obj:`numpy.ndarray`
Fixed input signal of size :math:`n_x`.
hsize : :obj:`int`
Size of the 1d compact filters (filters must have odd number of samples and are assumed to be
centered in the middle of the filter support).
ih : :obj:`tuple`
Indices of the locations of the filters ``hs`` in the model (and data). Note
that the filters must be regularly sampled, i.e. :math:`dh=\text{diff}(ih)=\text{const.}`
dtype : :obj:`str`, optional
Type of elements in input array.
name : :obj:`str`, optional
Name of operator (to be used by :func:`pylops.utils.describe.describe`)
Attributes
----------
shape : :obj:`tuple`
Operator shape
explicit : :obj:`bool`
Operator contains a matrix that can be solved explicitly (``True``) or
not (``False``)
Raises
------
ValueError
If filters ``hsize`` is a even number
ValueError
If ``ih`` is not regularly sampled
ValueError
If ``ih`` is outside the bounds defined by ``dims[axis]``
See Also
--------
NonStationaryConvolve1D : 1D non-stationary convolution operator.
NonStationaryFilters2D : 2D non-stationary filter estimation operator.
Notes
-----
The NonStationaryFilters1D operates in a similar fashion to the
:class:`pylops.signalprocessing.NonStationaryConvolve1D` operator. In practical applications,
this operator shall be used when interested to estimate a 1-dimensional non-stationary filter
given an input and output signal.
In forward mode, this operator uses the same implementation of the
:class:`pylops.signalprocessing.NonStationaryConvolve1D`, with the main difference that
the role of the filters and the input signal is swapped. Nevertheless, to understand how
to implement adjoint, mathematically we arrange the forward operator in a slightly different way.
Assuming once again an input signal composed of :math:`N=5` samples, and filters at locations
:math:`t_1` and :math:`t_3`, the forward operator can be represented as follows:
.. math::
\mathbf{y} =
\begin{bmatrix}
\mathbf{X}_0 & \mathbf{X}_1 & \vdots & \mathbf{X}_4
\end{bmatrix} \mathbf{L}
\begin{bmatrix}
\mathbf{h}_1 \\ \mathbf{h}_3
\end{bmatrix}
where :math:`\mathbf{L}` is an operator that linearly interpolates the filters from the available locations to
the entire input grid -- i.e., :math:`[\hat{\mathbf{h}}_0 \quad \mathbf{h}_1 \quad \hat{\mathbf{h}}_2 \quad
\mathbf{h}_3 \quad \hat{\mathbf{h}}_4]^T = \mathbf{L} [ \mathbf{h}_1 \quad \mathbf{h}_3]`. Finally,
:math:`\mathbf{X}_i` is a diagonal matrix containing the value :math:`x_i` along the
main diagonal. Note that in practice the filter may be shorter than the input and output signals and
the :math:`x_i` values are placed only at the effective positions of the filter along the diagonal matrices
:math:`\mathbf{X}_i`.
In adjoint mode, the output signal is first weighted by the fixed input signal and then spread across
multiple filters (i.e., adjoint of linear interpolation) as follows
.. math::
\mathbf{h} =
\mathbf{L}^H
\begin{bmatrix}
\mathbf{X}_0 \\ \mathbf{X}_1 \\ \vdots \\ \mathbf{X}_4
\end{bmatrix}
\mathbf{y}
"""
def __init__(
self,
inp: NDArray,
hsize: int,
ih: InputDimsLike,
dtype: DTypeLike = "float64",
name: str = "C",
) -> None:
if hsize % 2 == 0:
raise ValueError("filters hs must have odd length")
if len(np.unique(np.diff(ih))) > 1:
raise ValueError(
"the indices of filters 'ih' are must be regularly sampled"
)
if min(ih) < 0 or max(ih) >= inp.size:
raise ValueError(
"the indices of filters 'ih' must be larger than 0 and smaller than `dims`"
)
self.inp = inp
self.hsize = hsize
self.oh, self.dh, self.nh, self.eh = ih[0], ih[1] - ih[0], len(ih), ih[-1]
super().__init__(
dtype=np.dtype(dtype), dims=(len(ih), hsize), dimsd=inp.shape, name=name
)
# use same interpolation method as inNonStationaryConvolve1D
_interpolate_h = staticmethod(NonStationaryConvolve1D._interpolate_h)
@staticmethod
def _interpolate_hadj(htmp, hs, hextremes, ix, oh, dh, nh):
"""find closest filters and spread weighted psf"""
ih_closest = int(np.floor((ix - oh) / dh))
if ih_closest < 0:
# hs[0, hextremes[0] : hextremes[1]] += htmp
sl = tuple([0] + [slice(hextremes[0], hextremes[1])])
hs = inplace_add(htmp, hs, sl)
elif ih_closest >= nh - 1:
# hs[nh - 1, hextremes[0] : hextremes[1]] += htmp
sl = tuple([nh - 1] + [slice(hextremes[0], hextremes[1])])
hs = inplace_add(htmp, hs, sl)
else:
dh_closest = (ix - oh) / dh - ih_closest
# hs[ih_closest, hextremes[0] : hextremes[1]] += (1 - dh_closest) * htmp
sl = tuple([ih_closest] + [slice(hextremes[0], hextremes[1])])
hs = inplace_add((1 - dh_closest) * htmp, hs, sl)
# hs[ih_closest + 1, hextremes[0] : hextremes[1]] += dh_closest * htmp
sl = tuple([ih_closest + 1] + [slice(hextremes[0], hextremes[1])])
hs = inplace_add(dh_closest * htmp, hs, sl)
return hs
@reshaped
def _matvec(self, x: NDArray) -> NDArray:
ncp = get_array_module(x)
y = ncp.zeros(self.dimsd, dtype=self.dtype)
for ix in range(self.dimsd[0]):
h = self._interpolate_h(x, ix, self.oh, self.dh, self.nh)
xextremes = (
max(0, ix - self.hsize // 2),
min(ix + self.hsize // 2 + 1, self.dimsd[0]),
)
hextremes = (
max(0, -ix + self.hsize // 2),
min(self.hsize, self.hsize // 2 + (self.dimsd[0] - ix)),
)
# y[..., xextremes[0] : xextremes[1]] += (
# self.inp[..., ix : ix + 1] * h[hextremes[0] : hextremes[1]]
# )
sl = tuple(
[slice(None, None)] * (len(self.dimsd) - 1)
+ [slice(xextremes[0], xextremes[1])]
)
y = inplace_add(
self.inp[..., ix : ix + 1] * h[hextremes[0] : hextremes[1]], y, sl
)
return y
@reshaped
def _rmatvec(self, x: NDArray) -> NDArray:
ncp = get_array_module(x)
hs = ncp.zeros(self.dims, dtype=self.dtype)
for ix in range(self.dimsd[0]):
xextremes = (
max(0, ix - self.hsize // 2),
min(ix + self.hsize // 2 + 1, self.dimsd[0]),
)
hextremes = (
max(0, -ix + self.hsize // 2),
min(self.hsize, self.hsize // 2 + (self.dimsd[0] - ix)),
)
htmp = self.inp[ix] * x[..., xextremes[0] : xextremes[1]]
hs = self._interpolate_hadj(
htmp, hs, hextremes, ix, self.oh, self.dh, self.nh
)
return hs