Source code for pylops.signalprocessing.fourierradon2d

__all__ = ["FourierRadon2D"]

from typing import Literal

import numpy as np
import scipy as sp

from pylops import LinearOperator
from pylops.utils import deps
from pylops.utils.backend import get_array_module, get_complex_dtype
from pylops.utils.decorators import reshaped
from pylops.utils.typing import DTypeLike, NDArray, Tengine_nnc

jit_message = deps.numba_import("the radon2d module")
cupy_message = deps.cupy_import("the radon2d module")

if jit_message is None:
    from ._fourierradon2d_numba import _aradon_inner_2d, _radon_inner_2d
if jit_message is None and cupy_message is None:
    from ._fourierradon2d_cuda import _aradon_inner_2d_cuda, _radon_inner_2d_cuda


[docs] class FourierRadon2D(LinearOperator): r"""2D Fourier Radon transform Apply Radon forward (and adjoint) transform using Fast Fourier Transform to a 2-dimensional array of size :math:`[n_{p_x} \times n_t]` (and :math:`[n_x \times n_t]`). Note that forward and adjoint follow the same convention of the time-space implementation in :class:`pylops.signalprocessing.Radon2D`. Parameters ---------- taxis : :obj:`numpy.ndarray` Time axis haxis : :obj:`numpy.ndarray` Spatial axis pxaxis : :obj:`numpy.ndarray` Axis of scanning variable :math:`p_x` of parametric curve nfft : :obj:`int` Number of samples in Fourier transform flims : :obj:`tuple`, optional Indices of lower and upper limits of Fourier axis to be used in the application of the Radon matrix (when ``None``, use entire axis) kind : :obj:`str` Curve to be used for stacking/spreading (``linear``, ``parabolic``) engine : :obj:`str` Engine used for computation (``numpy`` or ``numba`` or ``cuda``) num_threads_per_blocks : :obj:`tuple` Number of threads in each block (only when ``engine=cuda``) dtype : :obj:`str` Type of elements in input array. name : :obj:`str` Name of operator (to be used by :func:`pylops.utils.describe.describe`) Attributes ---------- haxis : :obj:`numpy.ndarray` Spatial axis (or squared axis when ``kind='parabolic'``) nh : :obj:`int` Number of samples in spatial axis. nt : :obj:`int` Number of samples in time axis. dh : :obj:`float` Sampling step in spatial axis. dt : :obj:`float` Sampling step in time axis. px : :obj:`numpy.ndarray` Axis of scanning variable :math:`p_x` of parametric curve npx : :obj:`int` Number of samples in :math:`p_x` axis. f : :obj:`numpy.ndarray` Fourier axis. nfft2 : :obj:`int` Number of samples in positive Fourier axis. cdtype : :obj:`str` Complex type associated with ``dtype``. flims : :obj:`tuple` Indices of lower and upper limits of Fourier axis to be used in num_blocks_matvec : :obj:`tuple` Number of blocks in each dimension for ``matvec`` (only when ``engine=cuda``) num_blocks_rmatvec : :obj:`tuple` Number of blocks in each dimension for ``rmatvec`` (only when ``engine=cuda``) dims : :obj:`tuple` Shape of the array after the adjoint, but before flattening. For example, ``x_reshaped = (Op.H * y.ravel()).reshape(Op.dims)``. dimsd : :obj:`tuple` Shape of the array after the forward, but before flattening. For example, ``y_reshaped = (Op * x.ravel()).reshape(Op.dimsd)``. shape : :obj:`tuple` Operator shape. Raises ------ ValueError If ``engine`` is neither ``numpy``, ``numba``, nor ``cuda``. Notes ----- The FourierRadon2D operator applies the Radon transform in the frequency domain. After transforming a 2-dimensional array of size :math:`[n_x \times n_t]` into the frequency domain, the following linear transformation is applied to each frequency component in adjoint mode: .. math:: \begin{bmatrix} m(p_{x,1}, \omega_i) \\ m(p_{x,2}, \omega_i) \\ \vdots \\ m(p_{x,N_p}, \omega_i) \end{bmatrix} = \begin{bmatrix} e^{-j \omega_i p_{x,1} x^l_1} & e^{-j \omega_i p_{x,1} x^l_2} & \ldots & e^{-j \omega_i p_{x,1} x^l_{N_x}} \\ e^{-j \omega_i p_{x,2} x^l_1} & e^{-j \omega_i p_{x,2} x^l_2} & \ldots & e^{-j \omega_i p_{x,2} x^l_{N_x}} \\ \vdots & \vdots & \ddots & \vdots \\ e^{-j \omega_i p_{x,N_p} x^l_1} & e^{-j \omega_i p_{x,N_p} x^l_2} & \ldots & e^{-j \omega_i p_{x,N_p} x^l_{N_x}} \\ \end{bmatrix} \begin{bmatrix} d(x_1, \omega_i) \\ d(x_2, \omega_i) \\ \vdots \\ d(x_{N_x}, \omega_i) \end{bmatrix} where :math:`l=1,2`. Similarly the forward mode is implemented by applying the transpose and complex conjugate of the above matrix to the model transformed to the Fourier domain. Refer to [1]_ for more theoretical and implementation details. .. [1] Sacchi, M. "Statistical and Transform Methods for Geophysical Signal Processing", 2007. """ def __init__( self, taxis: NDArray, haxis: NDArray, pxaxis: NDArray, nfft: int, flims: tuple[int, int] | None = None, kind: Literal["linear", "parabolic"] = "linear", engine: Tengine_nnc = "numpy", num_threads_per_blocks: tuple[int, int] = (32, 32), dtype: DTypeLike = "float64", name: str = "R", ) -> None: # engine if engine not in ["numpy", "numba", "cuda"]: msg = "`engine` must be numpy or numba or cuda" raise ValueError(msg) if engine == "numba" and jit_message is not None: engine = "numpy" # dimensions and super dims = len(pxaxis), len(taxis) dimsd = len(haxis), len(taxis) super().__init__(dtype=np.dtype(dtype), dims=dims, dimsd=dimsd, name=name) # other input params self.taxis, self.haxis = taxis, haxis self.nh, self.nt = self.dimsd self.px = pxaxis self.npx, self.nfft = self.dims[0], nfft self.dt = taxis[1] - taxis[0] self.dh = haxis[1] - haxis[0] self.f = np.fft.rfftfreq(self.nfft, d=self.dt).astype(self.dtype) self.nfft2 = len(self.f) self.cdtype = get_complex_dtype(dtype) self.flims = (0, self.nfft2) if flims is None else flims if kind == "parabolic": self.haxis = self.haxis**2 # create additional input parameters for engine=cuda if engine == "cuda": self.num_threads_per_blocks = num_threads_per_blocks ( num_threads_per_blocks_hpx, num_threads_per_blocks_f, ) = num_threads_per_blocks num_blocks_px = ( self.dims[0] + num_threads_per_blocks_hpx - 1 ) // num_threads_per_blocks_hpx num_blocks_h = ( self.dimsd[0] + num_threads_per_blocks_hpx - 1 ) // num_threads_per_blocks_hpx num_blocks_f = ( self.dims[1] + num_threads_per_blocks_f - 1 ) // num_threads_per_blocks_f self.num_blocks_matvec = (num_blocks_h, num_blocks_f) self.num_blocks_rmatvec = (num_blocks_px, num_blocks_f) self._register_multiplications(engine) def _register_multiplications(self, engine: str) -> None: if engine == "numba": self._matvec = self._matvec_numba self._rmatvec = self._rmatvec_numba elif engine == "cuda": self._matvec = self._matvec_cuda self._rmatvec = self._rmatvec_cuda else: self._matvec = self._matvec_numpy self._rmatvec = self._rmatvec_numpy @reshaped def _matvec_numpy(self, x: NDArray) -> NDArray: ncp = get_array_module(x) self.f = ncp.asarray(self.f) x = ncp.fft.rfft(x, n=self.nfft, axis=-1) H, PX, F = ncp.meshgrid( self.haxis, self.px, self.f[self.flims[0] : self.flims[1]], indexing="ij" ) y = ncp.zeros((self.nh, self.nfft2), dtype=self.cdtype) y[:, self.flims[0] : self.flims[1]] = ncp.einsum( "ijk,jk->ik", ncp.exp(-1j * 2 * ncp.pi * F * PX * H), x[:, self.flims[0] : self.flims[1]], ) y = ncp.real(ncp.fft.irfft(y, n=self.nfft, axis=-1))[:, : self.nt] return y @reshaped def _rmatvec_numpy(self, y: NDArray) -> NDArray: ncp = get_array_module(y) self.f = ncp.asarray(self.f) y = ncp.fft.rfft(y, n=self.nfft, axis=-1) PX, H, F = ncp.meshgrid( self.px, self.haxis, self.f[self.flims[0] : self.flims[1]], indexing="ij" ) x = ncp.zeros((self.npx, self.nfft2), dtype=self.cdtype) x[:, self.flims[0] : self.flims[1]] = ncp.einsum( "ijk,jk->ik", ncp.exp(1j * 2 * ncp.pi * F * PX * H), y[:, self.flims[0] : self.flims[1]], ) x = ncp.real(ncp.fft.irfft(x, n=self.nfft, axis=-1))[:, : self.nt] return x @reshaped def _matvec_cuda(self, x: NDArray) -> NDArray: ncp = get_array_module(x) y = ncp.zeros((self.nh, self.nfft2), dtype=self.cdtype) x = ncp.fft.rfft(x, n=self.nfft, axis=-1) y = _radon_inner_2d_cuda( x, y, ncp.asarray(self.f), self.px, self.haxis, self.flims[0], self.flims[1], self.npx, self.nh, num_blocks=self.num_blocks_matvec, num_threads_per_blocks=self.num_threads_per_blocks, ) y = ncp.real(ncp.fft.irfft(y, n=self.nfft, axis=-1))[:, : self.nt] return y @reshaped def _rmatvec_cuda(self, y: NDArray) -> NDArray: ncp = get_array_module(y) x = ncp.zeros((self.npx, self.nfft2), dtype=self.cdtype) y = ncp.fft.rfft(y, n=self.nfft, axis=-1) x = _aradon_inner_2d_cuda( x, y, ncp.asarray(self.f), self.px, self.haxis, self.flims[0], self.flims[1], self.npx, self.nh, num_blocks=self.num_blocks_rmatvec, num_threads_per_blocks=self.num_threads_per_blocks, ) x = ncp.real(ncp.fft.irfft(x, n=self.nfft, axis=-1))[:, : self.nt] return x @reshaped def _matvec_numba(self, x: NDArray) -> NDArray: y = np.zeros((self.nh, self.nfft2), dtype=self.cdtype) x = sp.fft.rfft(x, n=self.nfft, axis=-1) _radon_inner_2d( x, y, self.f, self.px, self.haxis, self.flims[0], self.flims[1], self.npx, self.nh, ) y = np.real(sp.fft.irfft(y, n=self.nfft, axis=-1))[:, : self.nt] return y @reshaped def _rmatvec_numba(self, y: NDArray) -> NDArray: x = np.zeros((self.npx, self.nfft2), dtype=self.cdtype) y = sp.fft.rfft(y, n=self.nfft, axis=-1) _aradon_inner_2d( x, y, self.f, self.px, self.haxis, self.flims[0], self.flims[1], self.npx, self.nh, ) x = np.real(sp.fft.irfft(x, n=self.nfft, axis=-1))[:, : self.nt] return x