Source code for pylops.signalprocessing.nonstatconvolve2d

__all__ = [
    "NonStationaryConvolve2D",
    "NonStationaryFilters2D",
]

import os
from typing import Tuple, Union

import numpy as np

from pylops import LinearOperator
from pylops.utils import deps
from pylops.utils._internal import _value_or_sized_to_tuple
from pylops.utils.backend import get_array_module
from pylops.utils.decorators import reshaped
from pylops.utils.typing import DTypeLike, InputDimsLike, NDArray

jit_message = deps.numba_import("the nonstatconvolve2d module")

if jit_message is None:
    from numba import jit, prange

    from ._nonstatconvolve2d_cuda import (
        _matvec_rmatvec_call as _matvec_rmatvec_cuda_call,
    )

    # detect whether to use parallel or not
    numba_threads = int(os.getenv("NUMBA_NUM_THREADS", "1"))
    parallel = True if numba_threads != 1 else False
else:
    prange = range


[docs]class NonStationaryConvolve2D(LinearOperator): r"""2D non-stationary convolution operator. Apply non-stationary two-dimensional convolution. A varying compact filter is provided on a coarser grid and on-the-fly interpolation is applied in forward and adjoint modes. Both input and output have size :math:`n_x \times n_z`. Parameters ---------- dims : :obj:`list` or :obj:`int` Number of samples for each dimension (which we refer to as :math:`n_x \times n_z`). hs : :obj:`numpy.ndarray` Bank of 2d compact filters of size :math:`n_{\text{filts},x} \times n_{\text{filts},z} \times n_{h,x} \times n_{h,z}`. Filters must have odd number of samples and are assumed to be centered in the middle of the filter support. ihx : :obj:`tuple` Indices of the x locations of the filters ``hs`` in the model (and data). Note that the filters must be regularly sampled, i.e. :math:`dh_x=\text{diff}(ihx)=\text{const.}` ihz : :obj:`tuple` Indices of the z locations of the filters ``hs`` in the model (and data). Note that the filters must be regularly sampled, i.e. :math:`dh_z=\text{diff}(ihz)=\text{const.}` engine : :obj:`str`, optional Engine used for spread computation (``numpy``, ``numba``, or ``cuda``) num_threads_per_blocks : :obj:`tuple`, optional Number of threads in each block (only when ``engine=cuda``) dtype : :obj:`str`, optional Type of elements in input array. name : :obj:`str`, optional Name of operator (to be used by :func:`pylops.utils.describe.describe`) Attributes ---------- shape : :obj:`tuple` Operator shape explicit : :obj:`bool` Operator contains a matrix that can be solved explicitly (``True``) or not (``False``) Raises ------ ValueError If filters ``hs`` have even size ValueError If ``ihx`` or ``ihz`` is not regularly sampled ValueError If ``ihx`` or ``ihz`` are outside the bounds defined by ``dims`` NotImplementedError If ``engine`` is neither ``numpy``, ``fftw``, nor ``scipy``. Notes ----- The NonStationaryConvolve2D operator applies non-stationary two-dimensional convolution between the input signal :math:`d(x, z)` and a bank of compact filter kernels :math:`h(x, z; x_i, z_i)`. Assuming an input signal composed of :math:`N \times M` samples (with :math:`N=4` and :math:`M=3`, and filters at locations :math:`x_1, x_3` and :math:`z_1, z_3`, the forward operator can be represented as follows: .. math:: \mathbf{y} = \begin{bmatrix} \hat{h}_{(0,0),(0,0)} & \cdots & h_{(1,1),(0,0)} & \cdots & \hat{h}_{(2,2),(0,0)} & \cdots \\ \hat{h}_{(0,0),(0,1)} & \cdots & h_{(1,1),(0,1)} & \cdots & \hat{h}_{(2,2),(0,0)} & \cdots \\ \vdots & \ddots & & \ddots & \vdots & \vdots \\ \hat{h}_{(0,0),(4,3)} & \cdots & h_{(1,1),(4,3)} & \cdots & \hat{h}_{(2,2),(0,0)} & \cdots \\ \end{bmatrix} \begin{bmatrix} x_{0,0} \\ \vdots \\ x_{0,N} \\ x_{1,0} \\ \vdots \\ x_{1,N} \\ x_{M,0} \\ \vdots \\ x_{M,N} \end{bmatrix} where :math:`\mathbf{h}_{(1,1)} = [h_{(1,1),(0,0)}, h_{(1,1),(0,1)}, \ldots, h_{(1,1),(4,3)}]` (and :math:`\mathbf{h}_{(1,1)}`, :math:`\mathbf{h}_{(1,3)}`, :math:`\mathbf{h}_{(3,1)}`, :math:`\mathbf{h}_{(3,3)}`) are the provided filter, :math:`\hat{\mathbf{h}}_{(0,0)} = \mathbf{h}_{(1,1)}` and similar are the filters outside the range of the provided filters (which are extrapolated to be the same as the nearest provided filter) and :math:`\hat{\mathbf{h}}_{(2,2)} = \text{bilinear}(\mathbf{h}_{(1,1)}, \mathbf{h}_{(3,1)}, \mathbf{h}_{(1,3)},\mathbf{h}_{(3,3)})` is the filter within the range of the provided filters (which is bilinearly interpolated from the four nearest provided filter on either side of its location). For more details on the numerical implementation of the forward and adjoint, see :class:`pylops.signalprocessing.NonStationaryConvolve1D`. """ def __init__( self, dims: Union[int, InputDimsLike], hs: NDArray, ihx: InputDimsLike, ihz: InputDimsLike, engine: str = "numpy", num_threads_per_blocks: Tuple[int, int] = (32, 32), dtype: DTypeLike = "float64", name: str = "C", ) -> None: if engine not in ["numpy", "numba", "cuda"]: raise NotImplementedError("engine must be numpy or numba or cuda") if hs.shape[2] % 2 == 0 or hs.shape[3] % 2 == 0: raise ValueError("filters hs must have odd length") if len(np.unique(np.diff(ihx))) > 1 or len(np.unique(np.diff(ihz))) > 1: raise ValueError( "the indices of filters 'ih' are must be regularly sampled" ) if min(ihx) < 0 or min(ihz) < 0 or max(ihx) >= dims[0] or max(ihz) >= dims[1]: raise ValueError( "the indices of filters 'ih' must be larger than 0 and smaller than `dims`" ) self.hs = hs self.hshape = hs.shape[2:] self.ohx, self.dhx, self.nhx = ihx[0], ihx[1] - ihx[0], len(ihx) self.ohz, self.dhz, self.nhz = ihz[0], ihz[1] - ihz[0], len(ihz) self.ehx, self.ehz = ihx[-1], ihz[-1] self.dims = _value_or_sized_to_tuple(dims) self.engine = engine super().__init__(dtype=np.dtype(dtype), dims=dims, dimsd=dims, name=name) # create additional input parameters for engine=cuda self.kwargs_cuda = {} if engine == "cuda": self.kwargs_cuda["num_threads_per_blocks"] = num_threads_per_blocks num_threads_per_blocks_x, num_threads_per_blocks_z = num_threads_per_blocks num_blocks_x = ( self.dims[0] + num_threads_per_blocks_x - 1 ) // num_threads_per_blocks_x num_blocks_z = ( self.dims[1] + num_threads_per_blocks_z - 1 ) // num_threads_per_blocks_z self.kwargs_cuda["num_blocks"] = (num_blocks_x, num_blocks_z) self._register_multiplications(engine) def _register_multiplications(self, engine: str) -> None: if engine == "numba": numba_opts = dict(nopython=True, fastmath=True, nogil=True, parallel=True) self._mvrmv = jit(**numba_opts)(self._matvec_rmatvec) elif engine == "cuda": self._mvrmv = _matvec_rmatvec_cuda_call else: self._mvrmv = self._matvec_rmatvec @staticmethod def _matvec_rmatvec( x: NDArray, y: NDArray, hs: NDArray, hshape: Tuple[int, int], xdims: Tuple[int, int], ohx: float, ohz: float, dhx: float, dhz: float, nhx: int, nhz: int, rmatvec: bool = False, ) -> NDArray: for ix in prange(xdims[0]): for iz in range(xdims[1]): # find closest filters and interpolate h ihx_l = int(np.floor((ix - ohx) / dhx)) ihz_t = int(np.floor((iz - ohz) / dhz)) dhx_r = (ix - ohx) / dhx - ihx_l dhz_b = (iz - ohz) / dhz - ihz_t if ihx_l < 0: ihx_l = ihx_r = 0 dhx_l = dhx_r = 0.5 elif ihx_l >= nhx - 1: ihx_l = ihx_r = nhx - 1 dhx_l = dhx_r = 0.5 else: ihx_r = ihx_l + 1 dhx_l = 1.0 - dhx_r if ihz_t < 0: ihz_t = ihz_b = 0 dhz_t = dhz_b = 0.5 elif ihz_t >= nhz - 1: ihz_t = ihz_b = nhz - 1 dhz_t = dhz_b = 0.5 else: ihz_b = ihz_t + 1 dhz_t = 1.0 - dhz_b h_tl = hs[ihx_l, ihz_t] h_bl = hs[ihx_l, ihz_b] h_tr = hs[ihx_r, ihz_t] h_br = hs[ihx_r, ihz_b] h = ( dhz_t * dhx_l * h_tl + dhz_b * dhx_l * h_bl + dhz_t * dhx_r * h_tr + dhz_b * dhx_r * h_br ) # find extremes of model where to apply h (in case h is going out of model) xextremes = ( max(0, ix - hshape[0] // 2), min(ix + hshape[0] // 2 + 1, xdims[0]), ) zextremes = ( max(0, iz - hshape[1] // 2), min(iz + hshape[1] // 2 + 1, xdims[1]), ) # find extremes of h (in case h is going out of model) hxextremes = ( max(0, -ix + hshape[0] // 2), min(hshape[0], hshape[0] // 2 + (xdims[0] - ix)), ) hzextremes = ( max(0, -iz + hshape[1] // 2), min(hshape[1], hshape[1] // 2 + (xdims[1] - iz)), ) if not rmatvec: y[xextremes[0] : xextremes[1], zextremes[0] : zextremes[1]] += ( x[ix, iz] * h[ hxextremes[0] : hxextremes[1], hzextremes[0] : hzextremes[1] ] ) else: y[ix, iz] = np.sum( h[hxextremes[0] : hxextremes[1], hzextremes[0] : hzextremes[1]] * x[xextremes[0] : xextremes[1], zextremes[0] : zextremes[1]] ) return y @reshaped def _matvec(self, x: NDArray) -> NDArray: ncp = get_array_module(x) y = ncp.zeros(self.dims, dtype=self.dtype) y = self._mvrmv( x, y, self.hs, self.hshape, self.dims, self.ohx, self.ohz, self.dhx, self.dhz, self.nhx, self.nhz, rmatvec=False, **self.kwargs_cuda ) return y @reshaped def _rmatvec(self, x: NDArray) -> NDArray: ncp = get_array_module(x) y = ncp.zeros(self.dims, dtype=self.dtype) y = self._mvrmv( x, y, self.hs, self.hshape, self.dims, self.ohx, self.ohz, self.dhx, self.dhz, self.nhx, self.nhz, rmatvec=True, **self.kwargs_cuda ) return y
[docs]class NonStationaryFilters2D(LinearOperator): r"""2D non-stationary filter estimation operator. Estimate a non-stationary two-dimensional filter by non-stationary convolution. Parameters ---------- inp : :obj:`numpy.ndarray` Fixed input signal of size :math:`n_x \times n_z`. hshape : :obj:`list` or :obj:`tuple` Shape of the 2d compact filters (filters must have odd number of samples and are assumed to be centered in the middle of the filter support). ihx : :obj:`tuple` Indices of the x locations of the filters ``hs`` in the model (and data). Note that the filters must be regularly sampled, i.e. :math:`dh_x=\text{diff}(ihx)=\text{const.}` ihz : :obj:`tuple` Indices of the z locations of the filters ``hs`` in the model (and data). Note that the filters must be regularly sampled, i.e. :math:`dh_z=\text{diff}(ihz)=\text{const.}` engine : :obj:`str`, optional Engine used for spread computation (``numpy``, ``numba``, or ``cuda``) num_threads_per_blocks : :obj:`tuple`, optional Number of threads in each block (only when ``engine=cuda``) dtype : :obj:`str`, optional Type of elements in input array. name : :obj:`str`, optional Name of operator (to be used by :func:`pylops.utils.describe.describe`) Attributes ---------- shape : :obj:`tuple` Operator shape explicit : :obj:`bool` Operator contains a matrix that can be solved explicitly (``True``) or not (``False``) Raises ------ ValueError If filters ``hs`` have even size ValueError If ``ihx`` or ``ihz`` is not regularly sampled NotImplementedError If ``engine`` is neither ``numpy``, ``fftw``, nor ``scipy``. Notes ----- The NonStationaryConvolve2D operator is used to estimate a non-stationary two-dimensional filter betwen two signals, an input signal (provided directly to the operator) and the desired output signal. For more details on the numerical implementation of the forward and adjoint, see :class:`pylops.signalprocessing.NonStationaryFilters1D`. """ def __init__( self, inp: NDArray, hshape: InputDimsLike, ihx: InputDimsLike, ihz: InputDimsLike, engine: str = "numpy", num_threads_per_blocks: Tuple[int, int] = (32, 32), dtype: DTypeLike = "float64", name: str = "C", ) -> None: if engine not in ["numpy", "numba", "cuda"]: raise NotImplementedError("engine must be numpy or numba or cuda") if hshape[0] % 2 == 0 or hshape[1] % 2 == 0: raise ValueError("filters hs must have odd length") if len(np.unique(np.diff(ihx))) > 1 or len(np.unique(np.diff(ihz))) > 1: raise ValueError( "the indices of filters 'ih' are must be regularly sampled" ) if ( min(ihx) < 0 or min(ihz) < 0 or max(ihx) >= inp.shape[0] or max(ihz) >= inp.shape[1] ): raise ValueError( "the indices of filters 'ih' must be larger than 0 and smaller than `dims`" ) self.inp = inp self.inpdims = inp.shape self.hshape = hshape self.ohx, self.dhx, self.nhx = ihx[0], ihx[1] - ihx[0], len(ihx) self.ohz, self.dhz, self.nhz = ihz[0], ihz[1] - ihz[0], len(ihz) self.ehx, self.ehz = ihx[-1], ihz[-1] self.engine = engine super().__init__( dtype=np.dtype(dtype), dims=(self.nhx, self.nhz, *hshape), dimsd=self.inpdims, name=name, ) # create additional input parameters for engine=cuda self.kwargs_cuda = {} if engine == "cuda": self.kwargs_cuda["num_threads_per_blocks"] = num_threads_per_blocks num_threads_per_blocks_x, num_threads_per_blocks_z = num_threads_per_blocks num_blocks_x = ( self.dims[0] + num_threads_per_blocks_x - 1 ) // num_threads_per_blocks_x num_blocks_z = ( self.dims[1] + num_threads_per_blocks_z - 1 ) // num_threads_per_blocks_z self.kwargs_cuda["num_blocks"] = (num_blocks_x, num_blocks_z) self._register_multiplications(engine) def _register_multiplications(self, engine: str) -> None: if engine == "numba": numba_opts = dict(nopython=True, fastmath=True, nogil=True, parallel=True) self._mv = jit(**numba_opts)(self.__matvec) self._rmv = jit(**numba_opts)(self.__rmatvec) elif engine == "cuda": raise NotImplementedError("engine=cuda is currently not available") else: self._mv = self.__matvec self._rmv = self.__rmatvec # use same matvec method as in NonStationaryConvolve2D __matvec = staticmethod(NonStationaryConvolve2D._matvec_rmatvec) @staticmethod def __rmatvec( x: NDArray, y: NDArray, hs: NDArray, hshape: Tuple[int, int], xdims: Tuple[int, int], ohx: float, ohz: float, dhx: float, dhz: float, nhx: int, nhz: int, ) -> NDArray: # Currently a race condition seem to occur when updating parts of hs multiple times within # the same loop (see https://numba.pydata.org/numba-doc/latest/user/parallel.html?highlight=njit). # Until atomic operations are provided we create a temporary filter array and store intermediate # results from each ix and reduce them at the end. hstmp = np.zeros((xdims[0], *hs.shape)) for ix in prange(xdims[0]): for iz in range(xdims[1]): # find extremes of model where to apply h (in case h is going out of model) xextremes = ( max(0, ix - hshape[0] // 2), min(ix + hshape[0] // 2 + 1, xdims[0]), ) zextremes = ( max(0, iz - hshape[1] // 2), min(iz + hshape[1] // 2 + 1, xdims[1]), ) # find extremes of h (in case h is going out of model) hxextremes = ( max(0, -ix + hshape[0] // 2), min(hshape[0], hshape[0] // 2 + (xdims[0] - ix)), ) hzextremes = ( max(0, -iz + hshape[1] // 2), min(hshape[1], hshape[1] // 2 + (xdims[1] - iz)), ) htmp = ( x[ix, iz] * y[xextremes[0] : xextremes[1], zextremes[0] : zextremes[1]] ) # find closest filters and interpolate h ihx_l = int(np.floor((ix - ohx) / dhx)) ihz_t = int(np.floor((iz - ohz) / dhz)) dhx_r = (ix - ohx) / dhx - ihx_l dhz_b = (iz - ohz) / dhz - ihz_t if ihx_l < 0: ihx_l = ihx_r = 0 dhx_l = dhx_r = 0.5 elif ihx_l >= nhx - 1: ihx_l = ihx_r = nhx - 1 dhx_l = dhx_r = 0.5 else: ihx_r = ihx_l + 1 dhx_l = 1.0 - dhx_r if ihz_t < 0: ihz_t = ihz_b = 0 dhz_t = dhz_b = 0.5 elif ihz_t >= nhz - 1: ihz_t = ihz_b = nhz - 1 dhz_t = dhz_b = 0.5 else: ihz_b = ihz_t + 1 dhz_t = 1.0 - dhz_b hstmp[ ix, ihx_l, ihz_t, hxextremes[0] : hxextremes[1], hzextremes[0] : hzextremes[1], ] += ( dhz_t * dhx_l * htmp ) hstmp[ ix, ihx_l, ihz_b, hxextremes[0] : hxextremes[1], hzextremes[0] : hzextremes[1], ] += ( dhz_b * dhx_l * htmp ) hstmp[ ix, ihx_r, ihz_t, hxextremes[0] : hxextremes[1], hzextremes[0] : hzextremes[1], ] += ( dhz_t * dhx_r * htmp ) hstmp[ ix, ihx_r, ihz_b, hxextremes[0] : hxextremes[1], hzextremes[0] : hzextremes[1], ] += ( dhz_b * dhx_r * htmp ) hs = hstmp.sum(axis=0) return hs @reshaped def _matvec(self, x: NDArray) -> NDArray: ncp = get_array_module(x) y = ncp.zeros(self.dimsd, dtype=self.dtype) y = self._mv( self.inp, y, x, self.hshape, self.dimsd, self.ohx, self.ohz, self.dhx, self.dhz, self.nhx, self.nhz, **self.kwargs_cuda ) return y @reshaped def _rmatvec(self, x: NDArray) -> NDArray: ncp = get_array_module(x) hs = ncp.zeros(self.dims, dtype=self.dtype) hs = self._rmv( self.inp, x, hs, self.hshape, self.dimsd, self.ohx, self.ohz, self.dhx, self.dhz, self.nhx, self.nhz, **self.kwargs_cuda ) return hs