Source code for pylops.signalprocessing.nonstatconvolve3d

__all__ = ["NonStationaryConvolve3D"]

import os
from typing import Tuple, Union

import numpy as np

from pylops import LinearOperator
from pylops.utils import deps
from pylops.utils._internal import _value_or_sized_to_tuple
from pylops.utils.backend import get_array_module
from pylops.utils.decorators import reshaped
from pylops.utils.typing import DTypeLike, InputDimsLike, NDArray

jit_message = deps.numba_import("the nonstatconvolve3d module")

if jit_message is None:
    from numba import jit, prange

    from ._nonstatconvolve3d_cuda import (
        _matvec_rmatvec_call as _matvec_rmatvec_cuda_call,
    )

    # detect whether to use parallel or not
    numba_threads = int(os.getenv("NUMBA_NUM_THREADS", "1"))
    parallel = True if numba_threads != 1 else False
else:
    prange = range


[docs]class NonStationaryConvolve3D(LinearOperator): r"""3D non-stationary convolution operator. Apply non-stationary three-dimensional convolution. A varying compact filter is provided on a coarser grid and on-the-fly interpolation is applied in forward and adjoint modes. Both input and output have size :math:`n_x \times n_y \times n_z`. Parameters ---------- dims : :obj:`list` or :obj:`int` Number of samples for each dimension (which we refer to as :math:`n_x \times n_y \times n_z`). hs : :obj:`numpy.ndarray` Bank of 3d compact filters of size :math:`n_{\text{filts},x} \times n_{\text{filts},y} \times n_{\text{filts},z} \times n_{h,x} \times n_{h,y} \times n_{h,z}`. Filters must have odd number of samples and are assumed to be centered in the middle of the filter support. ihx : :obj:`tuple` Indices of the x locations of the filters ``hs`` in the model (and data). Note that the filters must be regularly sampled, i.e. :math:`dh_x=\text{diff}(ihx)=\text{const.}` ihy : :obj:`tuple` Indices of the y locations of the filters ``hs`` in the model (and data). Note that the filters must be regularly sampled, i.e. :math:`dh_y=\text{diff}(ihy)=\text{const.}` ihz : :obj:`tuple` Indices of the z locations of the filters ``hs`` in the model (and data). Note that the filters must be regularly sampled, i.e. :math:`dh_z=\text{diff}(ihz)=\text{const.}` engine : :obj:`str`, optional Engine used for spread computation (``numpy``, ``numba``, or ``cuda``) num_threads_per_blocks : :obj:`tuple`, optional Number of threads in each block (only when ``engine=cuda``) dtype : :obj:`str`, optional Type of elements in input array. name : :obj:`str`, optional Name of operator (to be used by :func:`pylops.utils.describe.describe`) Attributes ---------- shape : :obj:`tuple` Operator shape explicit : :obj:`bool` Operator contains a matrix that can be solved explicitly (``True``) or not (``False``) Raises ------ ValueError If filters ``hs`` have even size ValueError If ``ihx``, ``ihy`` or ``ihz`` is not regularly sampled NotImplementedError If ``engine`` is neither ``numpy``, ``fftw``, nor ``scipy``. Notes ----- See :class:`pylops.signalprocessing.NonStationaryConvolve2D`. """ def __init__( self, dims: Union[int, InputDimsLike], hs: NDArray, ihx: InputDimsLike, ihy: InputDimsLike, ihz: InputDimsLike, engine: str = "numpy", num_threads_per_blocks: Tuple[int, int, int] = (2, 16, 16), dtype: DTypeLike = "float64", name: str = "C", ) -> None: if engine not in ["numpy", "numba", "cuda"]: raise NotImplementedError("engine must be numpy or numba or cuda") if hs.shape[3] % 2 == 0 or hs.shape[4] % 2 == 0 or hs.shape[5] % 2 == 0: raise ValueError("filters hs must have odd length") if ( len(np.unique(np.diff(ihx))) > 1 or len(np.unique(np.diff(ihy))) > 1 or len(np.unique(np.diff(ihz))) > 1 ): raise ValueError( "the indices of filters 'ih' are must be regularly sampled" ) if ( min(ihx) < 0 or min(ihy) < 0 or min(ihz) < 0 or max(ihx) >= dims[0] or max(ihy) >= dims[1] or max(ihz) >= dims[2] ): raise ValueError( "the indices of filters 'ih' must be larger than 0 and smaller than `dims`" ) self.hs = hs self.hshape = hs.shape[3:] self.ohx, self.dhx, self.nhx = ihx[0], ihx[1] - ihx[0], len(ihx) self.ohy, self.dhy, self.nhy = ihy[0], ihy[1] - ihy[0], len(ihy) self.ohz, self.dhz, self.nhz = ihz[0], ihz[1] - ihz[0], len(ihz) self.ehx, self.ehx, self.ehz = ihx[-1], ihy[-1], ihz[-1] self.dims = _value_or_sized_to_tuple(dims) self.engine = engine super().__init__(dtype=np.dtype(dtype), dims=dims, dimsd=dims, name=name) # create additional input parameters for engine=cuda self.kwargs_cuda = {} if engine == "cuda": self.kwargs_cuda["num_threads_per_blocks"] = num_threads_per_blocks num_blocks_x = ( self.dims[0] + num_threads_per_blocks[0] - 1 ) // num_threads_per_blocks[0] num_blocks_y = ( self.dims[1] + num_threads_per_blocks[1] - 1 ) // num_threads_per_blocks[1] num_blocks_z = ( self.dims[2] + num_threads_per_blocks[2] - 1 ) // num_threads_per_blocks[2] self.kwargs_cuda["num_blocks"] = (num_blocks_x, num_blocks_y, num_blocks_z) self._register_multiplications(engine) def _register_multiplications(self, engine: str) -> None: if engine == "numba": numba_opts = dict(nopython=True, fastmath=True, nogil=True, parallel=True) self._mvrmv = jit(**numba_opts)(self._matvec_rmatvec) elif engine == "cuda": self._mvrmv = _matvec_rmatvec_cuda_call else: self._mvrmv = self._matvec_rmatvec @staticmethod def _matvec_rmatvec( x: NDArray, y: NDArray, hs: NDArray, hshape: Tuple[int, int, int], xdims: Tuple[int, int, int], ohx: float, ohy: float, ohz: float, dhx: float, dhy: float, dhz: float, nhx: int, nhy: int, nhz: int, rmatvec: bool = False, ) -> NDArray: for ix in prange(xdims[0]): for iy in range(xdims[1]): for iz in range(xdims[2]): # find closest filters and interpolate h ihx_l = int( np.floor((ix - ohx) / dhx) ) # id number of left for hs_arr ihy_b = int( np.floor((iy - ohy) / dhy) ) # id number of back for hs_arr ihz_t = int( np.floor((iz - ohz) / dhz) ) # id number of top for hs_arr dhx_r = ( ix - ohx ) / dhx - ihx_l # weight for right psfs, left 1-ihz_t dhy_f = ( iy - ohy ) / dhy - ihy_b # weight for front psfs, left 1-ihz_t dhz_d = ( iz - ohz ) / dhz - ihz_t # weight for down psfs, top 1-dhz_d if ihx_l < 0: ihx_l = ihx_r = 0 dhx_l = dhx_r = 0.5 elif ihx_l >= nhx - 1: ihx_l = ihx_r = nhx - 1 dhx_l = dhx_r = 0.5 else: ihx_r = ihx_l + 1 dhx_l = 1.0 - dhx_r if ihy_b < 0: ihy_b = ihy_f = 0 dhy_b = dhy_f = 0.5 elif ihy_b >= nhy - 1: ihy_b = ihy_f = nhy - 1 dhy_b = dhy_f = 0.5 else: ihy_f = ihy_b + 1 dhy_b = 1.0 - dhy_f if ihz_t < 0: ihz_t = ihz_d = 0 dhz_t = dhz_d = 0.5 elif ihz_t >= nhz - 1: ihz_t = ihz_d = nhz - 1 dhz_t = dhz_d = 0.5 else: ihz_d = ihz_t + 1 dhz_t = 1.0 - dhz_d h_lbt = hs[ihx_l, ihy_b, ihz_t] h_lbd = hs[ihx_l, ihy_b, ihz_d] h_lft = hs[ihx_l, ihy_f, ihz_t] h_lfd = hs[ihx_l, ihy_f, ihz_d] h_rbt = hs[ihx_r, ihy_b, ihz_t] h_rbd = hs[ihx_r, ihy_b, ihz_d] h_rft = hs[ihx_r, ihy_f, ihz_t] h_rfd = hs[ihx_r, ihy_f, ihz_d] h = ( dhx_l * dhy_b * dhz_t * h_lbt + dhx_l * dhy_b * dhz_d * h_lbd + dhx_l * dhy_f * dhz_t * h_lft + dhx_l * dhy_f * dhz_d * h_lfd + dhx_r * dhy_b * dhz_t * h_rbt + dhx_r * dhy_b * dhz_d * h_rbd + dhx_r * dhy_f * dhz_t * h_rft + dhx_r * dhy_f * dhz_d * h_rfd ) # find extremes of model where to apply h (in case h is going out of model) xextremes = ( max(0, ix - hshape[0] // 2), min(ix + hshape[0] // 2 + 1, xdims[0]), ) yextremes = ( max(0, iy - hshape[1] // 2), min(iy + hshape[1] // 2 + 1, xdims[1]), ) zextremes = ( max(0, iz - hshape[2] // 2), min(iz + hshape[2] // 2 + 1, xdims[2]), ) # find extremes of h (in case h is going out of model) hxextremes = ( max(0, -ix + hshape[0] // 2), min(hshape[0], hshape[0] // 2 + (xdims[0] - ix)), ) hyextremes = ( max(0, -iy + hshape[1] // 2), min(hshape[1], hshape[1] // 2 + (xdims[1] - iy)), ) hzextremes = ( max(0, -iz + hshape[2] // 2), min(hshape[2], hshape[2] // 2 + (xdims[2] - iz)), ) if not rmatvec: y[ xextremes[0] : xextremes[1], yextremes[0] : yextremes[1], zextremes[0] : zextremes[1], ] += ( x[ix, iy, iz] * h[ hxextremes[0] : hxextremes[1], hyextremes[0] : hyextremes[1], hzextremes[0] : hzextremes[1], ] ) else: y[ix, iy, iz] = np.sum( h[ hxextremes[0] : hxextremes[1], hyextremes[0] : hyextremes[1], hzextremes[0] : hzextremes[1], ] * x[ xextremes[0] : xextremes[1], yextremes[0] : yextremes[1], zextremes[0] : zextremes[1], ] ) return y @reshaped def _matvec(self, x: NDArray) -> NDArray: ncp = get_array_module(x) y = ncp.zeros(self.dims, dtype=self.dtype) y = self._mvrmv( x, y, self.hs, self.hshape, self.dims, self.ohx, self.ohy, self.ohz, self.dhx, self.dhy, self.dhz, self.nhx, self.nhy, self.nhz, rmatvec=False, **self.kwargs_cuda ) return y @reshaped def _rmatvec(self, x: NDArray) -> NDArray: ncp = get_array_module(x) y = ncp.zeros(self.dims, dtype=self.dtype) y = self._mvrmv( x, y, self.hs, self.hshape, self.dims, self.ohx, self.ohy, self.ohz, self.dhx, self.dhy, self.dhz, self.nhx, self.nhy, self.nhz, rmatvec=True, **self.kwargs_cuda ) return y