Source code for pylops.basicoperators.restriction

__all__ = ["Restriction"]

import logging

from typing import Sequence, Union

import numpy as np
import numpy.ma as np_ma
from numpy.core.multiarray import normalize_axis_index

from pylops import LinearOperator
from pylops.utils._internal import _value_or_sized_to_tuple
from pylops.utils.backend import get_array_module, to_cupy_conditional
from pylops.utils.typing import DTypeLike, InputDimsLike, IntNDArray, NDArray

logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.WARNING)


def _compute_iavamask(dims, axis, iava, ncp):
    """Compute restriction mask when using cupy arrays"""
    otherdims = np.array(dims)
    otherdims = np.delete(otherdims, axis)
    iavamask = ncp.zeros(int(dims[axis]), dtype=int)
    iavamask[iava] = 1
    iavamask = ncp.moveaxis(
        ncp.broadcast_to(iavamask, list(otherdims) + [dims[axis]]), -1, axis
    )
    iavamask = ncp.where(iavamask.ravel() == 1)[0]
    return iavamask


[docs]class Restriction(LinearOperator): r"""Restriction (or sampling) operator. Extract subset of values from input vector at locations ``iava`` in forward mode and place those values at locations ``iava`` in an otherwise zero vector in adjoint mode. Parameters ---------- dims : :obj:`list` or :obj:`int` Number of samples for each dimension iava : :obj:`list` or :obj:`numpy.ndarray` Integer indices of available samples for data selection. axis : :obj:`int`, optional .. versionadded:: 2.0.0 Axis along which restriction is applied to model. inplace : :obj:`bool`, optional Work inplace (``True``) or make a new copy (``False``). By default, data is a reference to the model (in forward) and model is a reference to the data (in adjoint). forceflat : :obj:`bool`, optional .. versionadded:: 2.2.0 Force an array to be flattened after rmatvec. Note that this is only required when `len(dims)=2`, otherwise pylops will detect whether to return a 1d or nd array. dtype : :obj:`str`, optional Type of elements in input array. name : :obj:`str`, optional .. versionadded:: 2.0.0 Name of operator (to be used by :func:`pylops.utils.describe.describe`) Attributes ---------- shape : :obj:`tuple` Operator shape explicit : :obj:`bool` Operator contains a matrix that can be solved explicitly (``True``) or not (``False``) See Also -------- pylops.signalprocessing.Interp : Interpolation operator Notes ----- Extraction (or *sampling*) of a subset of :math:`N` values at locations ``iava`` from an input (or model) vector :math:`\mathbf{x}` of size :math:`M` can be expressed as: .. math:: y_i = x_{l_i} \quad \forall i=0,1,\ldots,N-1 where :math:`\mathbf{l}=[l_0, l_1,\ldots, l_{N-1}]` is a vector containing the indices of the original array at which samples are taken. Conversely, in adjoint mode the available values in the data vector :math:`\mathbf{y}` are placed at locations :math:`\mathbf{l}=[l_0, l_1,\ldots, l_{M-1}]` in the model vector: .. math:: x_{l_i} = y_i \quad \forall i=0,1,\ldots,N-1 and :math:`x_{j}=0` for :math:`j \neq l_i` (i.e., at all other locations in input vector). """ def __init__( self, dims: Union[int, InputDimsLike], iava: Union[IntNDArray, Sequence[int]], axis: int = -1, inplace: bool = True, forceflat: bool = None, dtype: DTypeLike = "float64", name: str = "R", ) -> None: ncp = get_array_module(iava) dims = _value_or_sized_to_tuple(dims) axis = normalize_axis_index(axis, len(dims)) dimsd = list(dims) # data dimensions dimsd[axis] = len(iava) # check if forceflat is needed and set it back to None otherwise if len(dims) > 2: if forceflat is not None: logging.warning( f"setting forceflat=None since len(dims)={len(dims)}>2. " f"PyLops will automatically detect whether to return " f"a 1d or nd array based on the shape of the input" f"array." ) forceflat = None super().__init__(dtype=np.dtype(dtype), dims=dims, dimsd=dimsd, forceflat=forceflat, name=name) iavareshape = np.ones(len(self.dims), dtype=int) iavareshape[axis] = len(iava) # currently cupy does not support put_along_axis, so we need to # explicitly create a list of indices in the n-dimensional # model space which will be used in _rmatvec to place the input if ncp != np: self.iavamask = _compute_iavamask(self.dims, axis, iava, ncp) self.inplace = inplace self.axis = axis self.iavareshape = iavareshape self.iava = ncp.asarray(iava) def _matvec(self, x: NDArray) -> NDArray: ncp = get_array_module(x) if not self.inplace: x = x.copy() x = ncp.reshape(x, self.dims) y = ncp.take(x, self.iava, axis=self.axis) y = y.ravel() return y def _rmatvec(self, x: NDArray) -> NDArray: ncp = get_array_module(x) if not self.inplace: x = x.copy() x = ncp.reshape(x, self.dimsd) if ncp == np: y = ncp.zeros(self.dims, dtype=self.dtype) ncp.put_along_axis( y, ncp.reshape(self.iava, self.iavareshape), x, axis=self.axis ) else: if not hasattr(self, "iavamask"): self.iava = to_cupy_conditional(x, self.iava) self.iavamask = _compute_iavamask(self.dims, self.axis, self.iava, ncp) y = ncp.zeros(int(self.shape[-1]), dtype=self.dtype) y[self.iavamask] = x.ravel() y = y.ravel() return y def mask(self, x: NDArray) -> NDArray: """Apply mask to input signal returning a signal of same size with values at ``iava`` locations and ``0`` at other locations Parameters ---------- x : :obj:`numpy.ndarray` or :obj:`cupy.ndarray` Input array (can be either flattened or not) Returns ---------- y : :obj:`numpy.ma.core.MaskedArray` Masked array. """ ncp = get_array_module(x) if ncp != np: iava = ncp.asnumpy(self.iava) else: iava = self.iava.copy() y = np_ma.array(np.zeros(self.dims), mask=np.ones(self.dims), dtype=self.dtype) x = np.reshape(x, self.dims) x = np.swapaxes(x, self.axis, -1) y = np.swapaxes(y, self.axis, -1) y.mask[..., iava] = False if ncp == np: y[..., iava] = x[..., self.iava] else: y[..., iava] = ncp.asnumpy(x)[..., iava] y = np.swapaxes(y, -1, self.axis) return y