Source code for pylops.signalprocessing.convolvend

__all__ = ["ConvolveND"]

from typing import Optional, Union

import numpy as np
from numpy.core.multiarray import normalize_axis_index

from pylops import LinearOperator
from pylops.utils._internal import _value_or_sized_to_tuple
from pylops.utils.backend import (
    get_array_module,
    get_convolve,
    get_correlate,
    to_cupy_conditional,
)
from pylops.utils.decorators import reshaped
from pylops.utils.typing import DTypeLike, InputDimsLike, NDArray


[docs]class ConvolveND(LinearOperator): r"""ND convolution operator. Apply n-dimensional convolution with a compact filter to model (and data) along the ``axes`` of a n-dimensional array. Parameters ---------- dims : :obj:`list` or :obj:`int` Number of samples for each dimension h : :obj:`numpy.ndarray` nd compact filter to be convolved to input signal offset : :obj:`tuple`, optional Indices of the center of the compact filter axes : :obj:`int`, optional .. versionadded:: 2.0.0 Axes along which convolution is applied method : :obj:`str`, optional Method used to calculate the convolution (``direct`` or ``fft``). dtype : :obj:`str`, optional Type of elements in input array. name : :obj:`str`, optional .. versionadded:: 2.0.0 Name of operator (to be used by :func:`pylops.utils.describe.describe`) Attributes ---------- shape : :obj:`tuple` Operator shape explicit : :obj:`bool` Operator contains a matrix that can be solved explicitly (``True``) or not (``False``) Notes ----- The ConvolveND operator applies n-dimensional convolution between the input signal :math:`d(x_1, x_2, ..., x_N)` and a compact filter kernel :math:`h(x_1, x_2, ..., x_N)` in forward model. This is a straighforward extension to multiple dimensions of :obj:`pylops.signalprocessing.Convolve2D` operator. """ def __init__( self, dims: Union[int, InputDimsLike], h: NDArray, offset: Optional[InputDimsLike] = None, axes: InputDimsLike = (-2, -1), method: str = "fft", dtype: DTypeLike = "float64", name: str = "C", ): dims = _value_or_sized_to_tuple(dims) super().__init__(dtype=np.dtype(dtype), dims=dims, dimsd=dims, name=name) self.axes = ( np.arange(len(self.dims)) if axes is None else np.array([normalize_axis_index(ax, len(self.dims)) for ax in axes]) ) self.h = h hshape = np.array(self.h.shape) # padding if offset is None: offsetpad = np.zeros(self.h.ndim, dtype=int) else: offsetpad = np.asarray(offset, dtype=int) self.offset = 2 * (hshape // 2 - offsetpad) pad = [(0, 0) for _ in range(self.h.ndim)] dopad = False for inh, nh in enumerate(hshape): if nh % 2 == 0: self.offset[inh] -= 1 if self.offset[inh] != 0: pad[inh] = ( self.offset[inh] if self.offset[inh] > 0 else 0, -self.offset[inh] if self.offset[inh] < 0 else 0, ) dopad = True if dopad: ncp = get_array_module(h) self.h = ncp.pad(self.h, pad, mode="constant") self.nh = self.h.shape # find out which directions are used for convolution and define offsets if len(self.dims) != len(self.nh): dimsh = np.ones(len(self.dims), dtype=int) for iax, ax in enumerate(self.axes): dimsh[ax] = self.nh[iax] self.h = self.h.reshape(dimsh) # convolve and correlate functions self.convolve = get_convolve(h) self.correlate = get_correlate(h) self.method = method @reshaped def _matvec(self, x: NDArray) -> NDArray: # correct type of h if different from x and choose methods accordingly if type(self.h) is not type(x): self.h = to_cupy_conditional(x, self.h) self.convolve = get_convolve(self.h) self.correlate = get_correlate(self.h) return self.convolve(x, self.h, mode="same", method=self.method) @reshaped def _rmatvec(self, x: NDArray) -> NDArray: # correct type of h if different from x and choose methods accordingly if type(self.h) is not type(x): self.h = to_cupy_conditional(x, self.h) self.convolve = get_convolve(self.h) self.correlate = get_correlate(self.h) return self.correlate(x, self.h, mode="same", method=self.method)