__all__ = [
"MDC",
"MDD",
]
import logging
import numpy as np
from scipy.signal import filtfilt
from scipy.sparse.linalg import lsqr
from pylops import Diagonal, Identity, LinearOperator, Transpose
from pylops.optimization.basic import cgls
from pylops.optimization.leastsquares import preconditioned_inversion
from pylops.signalprocessing import FFT, Fredholm1
from pylops.utils import dottest as Dottest
from pylops.utils.backend import (
get_array_module,
get_fftconvolve,
get_module_name,
to_cupy_conditional,
)
from pylops.utils.typing import NDArray, Tfftengine_nsf
logger = logging.getLogger(__name__)
def _MDC(
G: NDArray,
nt: int,
nv: int,
dt: float = 1.0,
dr: float = 1.0,
twosided: bool = True,
saveGt: bool = True,
conj: bool = False,
prescaled: bool = False,
_Identity=Identity,
_Transpose=Transpose,
_FFT=FFT,
_Fredholm1=Fredholm1,
args_Identity: dict | None = None,
args_FFT: dict | None = None,
args_Identity1: dict | None = None,
args_FFT1: dict | None = None,
args_Fredholm1: dict | None = None,
) -> LinearOperator:
r"""Multi-dimensional convolution.
Used to be able to provide operators from different libraries (e.g., pylops-distributed) to
MDC. It operates in the same way as public method
(MDC) but has additional input parameters allowing
passing a different operator and additional arguments to be passed to such
operator.
"""
if args_Identity is None:
args_Identity = {}
if args_FFT is None:
args_FFT = {}
if args_Identity1 is None:
args_Identity1 = {}
if args_FFT1 is None:
args_FFT1 = {}
if args_Fredholm1 is None:
args_Fredholm1 = {}
if twosided and nt % 2 == 0:
msg = f"nt must be a odd number when twosided=True, got nt={nt}"
raise ValueError(msg)
# find out dtype of G
dtype = G[0, 0, 0].dtype
rdtype = np.real(np.ones(1, dtype=dtype)).dtype
# create Fredholm operator
if prescaled:
Frop = _Fredholm1(G, nv, saveGt=saveGt, dtype=dtype, **args_Fredholm1)
else:
Frop = _Fredholm1(
dr * dt * np.sqrt(nt) * G, nv, saveGt=saveGt, dtype=dtype, **args_Fredholm1
)
if conj:
Frop = Frop.conj()
# create FFT operators
nfmax, ns, nr = G.shape
# ensure that nfmax is not bigger than allowed
nfft = int(np.ceil((nt + 1) / 2))
if nfmax > nfft:
nfmax = nfft
logger.warning("nfmax set equal to ceil[(nt+1)/2=%d]", nfmax)
Fop = _FFT(
dims=(nt, nr, nv),
axis=0,
real=True,
ifftshift_before=twosided,
dtype=rdtype,
**args_FFT,
)
F1op = _FFT(
dims=(nt, ns, nv),
axis=0,
real=True,
ifftshift_before=False,
dtype=rdtype,
**args_FFT1,
)
# create Identity operator to extract only relevant frequencies
Iop = _Identity(
N=nfmax * nr * nv, M=nfft * nr * nv, inplace=True, dtype=dtype, **args_Identity
)
I1op = _Identity(
N=nfmax * ns * nv, M=nfft * ns * nv, inplace=True, dtype=dtype, **args_Identity1
)
F1opH = F1op.H
I1opH = I1op.H
# create MDC operator
MDCop = F1opH * I1opH * Frop * Iop * Fop
# force dtype to be real (as FFT operators assume real inputs and outputs)
MDCop.dtype = rdtype
return MDCop
[docs]
def MDC(
G: NDArray,
nt: int,
nv: int,
dt: float = 1.0,
dr: float = 1.0,
twosided: bool = True,
fftengine: Tfftengine_nsf = "numpy",
saveGt: bool = True,
conj: bool = False,
usematmul: bool = False,
prescaled: bool = False,
name: str = "M",
**kwargs_fft,
) -> LinearOperator:
r"""Multi-dimensional convolution.
Apply multi-dimensional convolution between two datasets.
Model and data should be provided after flattening 2- or 3-dimensional arrays
of size :math:`[n_t \times n_r \;(\times n_{vs})]` and
:math:`[n_t \times n_s \;(\times n_{vs})]` (or :math:`2n_t-1` for
``twosided=True``), respectively.
Parameters
----------
G : :obj:`numpy.ndarray`
Multi-dimensional convolution kernel in frequency domain of size
:math:`[n_{f_\text{max}} \times n_s \times n_r]`
nt : :obj:`int`
Number of samples along time axis for model and data (note that this
must be equal to :math:`2n_t-1` when working with ``twosided=True``.
nv : :obj:`int`
Number of samples along virtual source axis
dt : :obj:`float`, optional
Sampling of time integration axis :math:`\Delta t`
dr : :obj:`float`, optional
Sampling of receiver integration axis :math:`\Delta r`
twosided : :obj:`bool`, optional
MDC operator has both negative and positive time (``True``) or
only positive (``False``)
fftengine : :obj:`str`, optional
Engine used for fft computation (``numpy``, ``scipy`` or ``fftw``)
saveGt : :obj:`bool`, optional
Save ``G`` and ``G.H`` to speed up the computation of adjoint of
:class:`pylops.signalprocessing.Fredholm1` (``True``) or create
``G.H`` on-the-fly (``False``) Note that ``saveGt=True`` will be
faster but double the amount of required memory
conj : :obj:`str`, optional
Perform Fredholm integral computation with complex conjugate of ``G``
usematmul : :obj:`bool`, optional
Use :func:`numpy.matmul` (``True``) or for-loop with :func:`numpy.dot`
(``False``) in :py:class:`pylops.signalprocessing.Fredholm1` operator.
Refer to Fredholm1 documentation for details.
prescaled : :obj:`bool`, optional
Apply scaling to kernel (``False``) or not (``False``) when performing
spatial and temporal summations. In case ``prescaled=True``, the
kernel is assumed to have been pre-scaled when passed to the MDC
routine.
name : :obj:`str`, optional
.. versionadded:: 2.0.0
Name of operator (to be used by :func:`pylops.utils.describe.describe`)
**kwargs_fft
.. versionadded:: 2.6.0
Arbitrary keyword arguments to be passed to the selected fft method
Returns
-------
MOp : :obj:`pylops.LinearOperator`
Multi-dimensional convolution operator.
Raises
------
ValueError
If ``nt`` is even and ``twosided=True``
See Also
--------
MDD : Multi-dimensional deconvolution
Notes
-----
The so-called multi-dimensional convolution (MDC) is a chained
operator [1]_. It is composed of a forward Fourier transform,
a multi-dimensional integration, and an inverse Fourier transform:
.. math::
y(t, s, v) = \mathscr{F}^{-1} \Big( \int_S G(f, s, r)
\mathscr{F}(x(t, r, v))\,\mathrm{d}r \Big)
which is discretized as follows:
.. math::
y(t, s, v) = \sqrt{n_t} \Delta t \Delta r\mathscr{F}^{-1} \Big( \sum_{i_r=0}^{n_r}
G(f, s, i_r) \mathscr{F}(x(t, i_r, v)) \Big)
where :math:`\sqrt{n_t} \Delta t \Delta r` is not applied if ``prescaled=True``.
This operation can be discretized and performed by means of a
linear operator
.. math::
\mathbf{D}= \mathbf{F}^H \mathbf{G} \mathbf{F}
where :math:`\mathbf{F}` is the Fourier transform applied along
the time axis and :math:`\mathbf{G}` is the multi-dimensional
convolution kernel.
.. [1] Wapenaar, K., van der Neut, J., Ruigrok, E., Draganov, D., Hunziker,
J., Slob, E., Thorbecke, J., and Snieder, R., "Seismic interferometry
by crosscorrelation and by multi-dimensional deconvolution: a
systematic comparison", Geophysical Journal International, vol. 185,
pp. 1335-1364. 2011.
"""
MOp = _MDC(
G,
nt,
nv,
dt=dt,
dr=dr,
twosided=twosided,
saveGt=saveGt,
conj=conj,
prescaled=prescaled,
args_FFT={**{"engine": fftengine}, **kwargs_fft},
args_FFT1={**{"engine": fftengine}, **kwargs_fft},
args_Fredholm1={"usematmul": usematmul},
)
MOp.name = name
return MOp
[docs]
def MDD(
G: NDArray,
d: NDArray,
dt: float = 0.004,
dr: float = 1.0,
nfmax: int | None = None,
wav: NDArray | None = None,
twosided: bool = True,
causality_precond: bool = False,
adjoint: bool = False,
psf: bool = False,
dottest: bool = False,
saveGt: bool = True,
add_negative: bool = True,
smooth_precond: int = 0,
fftengine: Tfftengine_nsf = "numpy",
**kwargs_solver,
) -> (
tuple[NDArray, NDArray]
| tuple[NDArray, NDArray, NDArray]
| tuple[NDArray, NDArray, NDArray, NDArray]
):
r"""Multi-dimensional deconvolution.
Solve multi-dimensional deconvolution problem using
:py:func:`scipy.sparse.linalg.lsqr` iterative solver.
Parameters
----------
G : :obj:`numpy.ndarray`
Multi-dimensional convolution kernel in time domain of size
:math:`[n_s \times n_r \times n_t]` for ``twosided=False`` or
``twosided=True`` and ``add_negative=True``
(with only positive times) or size
:math:`[n_s \times n_r \times 2n_t-1]` for ``twosided=True`` and
``add_negative=False``
(with both positive and negative times)
d : :obj:`numpy.ndarray`
Data in time domain :math:`[n_s \,(\times n_{vs}) \times n_t]` if
``twosided=False`` or ``twosided=True`` and ``add_negative=True``
(with only positive times) or size
:math:`[n_s \,(\times n_{vs}) \times 2n_t-1]` if ``twosided=True``
dt : :obj:`float`, optional
Sampling of time integration axis
dr : :obj:`float`, optional
Sampling of receiver integration axis
nfmax : :obj:`int`, optional
Index of max frequency to include in deconvolution process
wav : :obj:`numpy.ndarray`, optional
Wavelet to convolve to the inverted model and psf
(must be centered around its index in the middle of the array).
If ``None``, the outputs of the inversion are returned directly.
twosided : :obj:`bool`, optional
MDC operator and data both negative and positive time (``True``)
or only positive (``False``)
add_negative : :obj:`bool`, optional
Add negative side to MDC operator and data (``True``) or not
(``False``)- operator and data are already provided with both positive
and negative sides. To be used only with ``twosided=True``.
causality_precond : :obj:`bool`, optional
Apply causality mask (``True``) or not (``False``)
smooth_precond : :obj:`int`, optional
Lenght of smoothing to apply to causality preconditioner
adjoint : :obj:`bool`, optional
Compute and return adjoint(s)
psf : :obj:`bool`, optional
Compute and return Point Spread Function (PSF) and its inverse
dottest : :obj:`bool`, optional
Apply dot-test
saveGt : :obj:`bool`, optional
Save ``G`` and ``G.H`` to speed up the computation of adjoint of
:class:`pylops.signalprocessing.Fredholm1` (``True``) or create
``G.H`` on-the-fly (``False``) Note that ``saveGt=True`` will be
faster but double the amount of required memory
fftengine : :obj:`str`, optional
Engine used for fft computation (``numpy``, ``scipy`` or ``fftw``)
**kwargs_solver
Arbitrary keyword arguments for chosen solver
(:py:func:`scipy.sparse.linalg.cg` and
:py:func:`pylops.optimization.solver.cg` are used as default for numpy
and cupy `data`, respectively)
Returns
-------
minv : :obj:`numpy.ndarray`
Inverted model of size :math:`[n_r \,(\times n_{vs}) \times n_t]`
for ``twosided=False`` or
:math:`[n_r \,(\times n_vs) \times 2n_t-1]` for ``twosided=True``
madj : :obj:`numpy.ndarray`
Adjoint model of size :math:`[n_r \,(\times n_{vs}) \times n_t]`
for ``twosided=False`` or
:math:`[n_r \,(\times n_r) \times 2n_t-1]` for ``twosided=True``
psfinv : :obj:`numpy.ndarray`
Inverted psf of size :math:`[n_r \times n_r \times n_t]`
for ``twosided=False`` or
:math:`[n_r \times n_r \times 2n_t-1]` for ``twosided=True``
psfadj : :obj:`numpy.ndarray`
Adjoint psf of size :math:`[n_r \times n_r \times n_t]`
for ``twosided=False`` or
:math:`[n_r \times n_r \times 2n_t-1]` for ``twosided=True``
See Also
--------
MDC : Multi-dimensional convolution
Notes
-----
Multi-dimensional deconvolution (MDD) is a mathematical ill-solved problem,
well-known in the image processing and geophysical community [1]_.
MDD aims at removing the effects of a Multi-dimensional Convolution
(MDC) kernel or the so-called blurring operator or point-spread
function (PSF) from a given data. It can be written as
.. math::
\mathbf{d}= \mathbf{D} \mathbf{m}
or, equivalently, by means of its normal equation
.. math::
\mathbf{m}= (\mathbf{D}^H\mathbf{D})^{-1} \mathbf{D}^H\mathbf{d}
where :math:`\mathbf{D}^H\mathbf{D}` is the PSF.
.. [1] Wapenaar, K., van der Neut, J., Ruigrok, E., Draganov, D., Hunziker,
J., Slob, E., Thorbecke, J., and Snieder, R., "Seismic interferometry
by crosscorrelation and by multi-dimensional deconvolution: a
systematic comparison", Geophyscial Journal International, vol. 185,
pp. 1335-1364. 2011.
"""
ncp = get_array_module(d)
ns, nr, nt = G.shape
if len(d.shape) == 2:
nv = 1
else:
nv = d.shape[1]
if twosided:
if add_negative:
nt2 = 2 * nt - 1
else:
nt2 = nt
nt = (nt2 + 1) // 2
nfmax_allowed = int(np.ceil((nt2 + 1) / 2))
else:
nt2 = nt
nfmax_allowed = nt
# Fix nfmax to be at maximum equal to half of the size of fft samples
if nfmax is None or nfmax > nfmax_allowed:
nfmax = nfmax_allowed
logger.warning("nfmax set equal to ceil[(nt+1)/2=%d]", nfmax)
# Add negative part to data and model
if twosided and add_negative:
G = ncp.concatenate((ncp.zeros((ns, nr, nt - 1), dtype=G.dtype), G), axis=-1)
d = ncp.concatenate(
(ncp.squeeze(ncp.zeros((ns, nv, nt - 1), dtype=d.dtype)), d), axis=-1
)
# Bring kernel to frequency domain
Gfft = ncp.fft.rfft(G, nt2, axis=-1)
Gfft = Gfft[..., :nfmax]
# Bring frequency/time to first dimension
Gfft = ncp.moveaxis(Gfft, -1, 0)
d = ncp.moveaxis(d, -1, 0)
if psf:
G = ncp.moveaxis(G, -1, 0)
# Define MDC linear operator
MDCop = MDC(
Gfft,
nt2,
nv=nv,
dt=dt,
dr=dr,
twosided=twosided,
saveGt=saveGt,
fftengine=fftengine,
)
if psf:
PSFop = MDC(
Gfft,
nt2,
nv=nr,
dt=dt,
dr=dr,
twosided=twosided,
saveGt=saveGt,
fftengine=fftengine,
)
if dottest:
Dottest(
MDCop, nt2 * ns * nv, nt2 * nr * nv, verb=True, backend=get_module_name(ncp)
)
if psf:
Dottest(
PSFop,
nt2 * ns * nr,
nt2 * nr * nr,
verb=True,
backend=get_module_name(ncp),
)
# Adjoint
if adjoint:
madj = MDCop.H * d.ravel()
madj = ncp.squeeze(madj.reshape(nt2, nr, nv))
madj = ncp.moveaxis(madj, 0, -1)
if psf:
psfadj = PSFop.H * G.ravel()
psfadj = ncp.squeeze(psfadj.reshape(nt2, nr, nr))
psfadj = ncp.moveaxis(psfadj, 0, -1)
# Inverse
if twosided and causality_precond:
P = np.ones((nt2, nr, nv))
P[: nt - 1] = 0
if smooth_precond > 0:
P = filtfilt(np.ones(smooth_precond) / smooth_precond, 1, P, axis=0)
P = to_cupy_conditional(d, P)
Pop = Diagonal(P)
minv = preconditioned_inversion(MDCop, d.ravel(), Pop, **kwargs_solver)[0]
else:
if ncp == np and "callback" not in kwargs_solver:
minv = lsqr(MDCop, d.ravel(), **kwargs_solver)[0]
else:
minv = cgls(
MDCop,
d.ravel(),
ncp.zeros(int(MDCop.shape[1]), dtype=MDCop.dtype),
**kwargs_solver,
)[0]
minv = ncp.squeeze(minv.reshape(nt2, nr, nv))
minv = ncp.moveaxis(minv, 0, -1)
if wav is not None:
wav1 = wav.copy()
for _ in range(minv.ndim - 1):
wav1 = wav1[ncp.newaxis]
minv = get_fftconvolve(d)(minv, wav1, mode="same")
if psf:
if ncp == np:
psfinv = lsqr(PSFop, G.ravel(), **kwargs_solver)[0]
else:
psfinv = cgls(
PSFop,
G.ravel(),
ncp.zeros(int(PSFop.shape[1]), dtype=PSFop.dtype),
**kwargs_solver,
)[0]
psfinv = ncp.squeeze(psfinv.reshape(nt2, nr, nr))
psfinv = ncp.moveaxis(psfinv, 0, -1)
if wav is not None:
wav1 = wav.copy()
for _ in range(psfinv.ndim - 1):
wav1 = wav1[np.newaxis]
psfinv = get_fftconvolve(d)(psfinv, wav1, mode="same")
if adjoint and psf:
return minv, madj, psfinv, psfadj
elif adjoint:
return minv, madj
elif psf:
return minv, psfinv
else:
return minv