Source code for pylops.utils.multiproc
__all__ = ["scalability_test"]
import time
from typing import List, Optional, Tuple
import numpy.typing as npt
[docs]def scalability_test(
Op,
x: npt.ArrayLike,
workers: Optional[List[int]] = None,
forward: bool = True,
) -> Tuple[List[float], List[float]]:
r"""Scalability test.
Small auxiliary routine to test the performance of operators using
``multiprocessing``. This helps identifying the maximum number of workers
beyond which no performance gain is observed.
Parameters
----------
Op : :obj:`pylops.LinearOperator`
Operator to test. It must allow for multiprocessing.
x : :obj:`numpy.ndarray`, optional
Input vector.
workers : :obj:`list`, optional
Number of workers to test out. Defaults to `[1, 2, 4]`.
forward : :obj:`bool`, optional
Apply forward (``True``) or adjoint (``False``)
Returns
-------
compute_times : :obj:`list`
Compute times as function of workers
speedup : :obj:`list`
Speedup as function of workers
"""
if workers is None:
workers = [1, 2, 4]
compute_times = []
speedup = []
for nworkers in workers:
print(f"Working with {nworkers} workers...")
# update number of workers in operator
Op.nproc = nworkers
# run forward/adjoint computation
starttime = time.time()
if forward:
_ = Op.matvec(x)
else:
_ = Op.rmatvec(x)
elapsedtime = time.time() - starttime
compute_times.append(elapsedtime)
speedup.append(compute_times[0] / elapsedtime)
Op.pool.close()
return compute_times, speedup