Source code for capytaine.matrices.linear_solvers

"""The linear solvers used in Capytaine.

They are based on numpy solvers with a thin layer for the handling of Hierarchical Toeplitz matrices.
"""
# Copyright (C) 2017-2019 Matthieu Ancellin
# See LICENSE file at <https://github.com/capytaine/capytaine>

import logging

import numpy as np
from scipy import linalg as sl
from scipy.sparse import linalg as ssl

from capytaine.matrices.block import BlockMatrix
from capytaine.matrices.block_toeplitz import BlockSymmetricToeplitzMatrix, BlockCirculantMatrix

LOG = logging.getLogger(__name__)


# DIRECT SOLVER

[docs] def solve_directly(A, b): assert isinstance(b, np.ndarray) and A.ndim == b.ndim+1 and A.shape[-2] == b.shape[-1] if isinstance(A, BlockCirculantMatrix): LOG.debug("\tSolve linear system %s", A) blocks_of_diagonalization = A.block_diagonalize() fft_of_rhs = np.fft.fft(np.reshape(b, (A.nb_blocks[0], A.block_shape[0])), axis=0) try: # Try to run it as vectorized numpy arrays. fft_of_result = np.linalg.solve(blocks_of_diagonalization, fft_of_rhs[..., np.newaxis])[..., 0] except np.linalg.LinAlgError: # Or do the same thing with list comprehension. fft_of_result = np.array([solve_directly(block, vec) for block, vec in zip(blocks_of_diagonalization, fft_of_rhs)]) result = np.fft.ifft(fft_of_result, axis=0).reshape((A.shape[1],)) return result elif isinstance(A, BlockSymmetricToeplitzMatrix): if A.nb_blocks == (2, 2): LOG.debug("\tSolve linear system %s", A) A1, A2 = A._stored_blocks[0, :] b1, b2 = b[:len(b)//2], b[len(b)//2:] x_plus = solve_directly(A1 + A2, b1 + b2) x_minus = solve_directly(A1 - A2, b1 - b2) return np.concatenate([x_plus + x_minus, x_plus - x_minus])/2 else: # Not implemented LOG.debug("\tSolve linear system %s", A) return solve_directly(A.full_matrix(), b) elif isinstance(A, BlockMatrix): LOG.debug("\tSolve linear system %s", A) return solve_directly(A.full_matrix(), b) elif isinstance(A, np.ndarray): LOG.debug(f"\tSolve linear system (size: {A.shape}) with numpy direct solver.") return np.linalg.solve(A, b) else: raise ValueError(f"Unrecognized type of matrix to solve: {A}")
# CACHED LU DECOMPOSITION
[docs] class LUSolverWithCache: """Solve linear system with the LU decomposition. The latest LU decomposition is kept in memory, if a system with the same matrix needs to be solved again, then the decomposition is reused. Most of the complexity of this class comes from: 1. @lru_cache does not work because numpy arrays are not hashable. So a basic cache system has been recoded from scratch. 2. To be the default solver for the BasicMatrixEngine, the solver needs to support matrices for problems with one or two reflection symmetries. Hence, a custom way to cache the LU decomposition of the matrices involved in the direct linear resolution of the symmetric problem. """ def __init__(self): self.cached_matrix = None self.cached_decomp = None
[docs] def solve(self, A, b): return self.solve_with_decomp(self.cached_lu_decomp(A), b)
[docs] def lu_decomp(self, A): """Return the LU decomposition of A. If A is BlockSymmetricToeplitzMatrix, then return a list of LU decompositions for each block of the block diagonalisation of the matrix. """ if isinstance(A, BlockSymmetricToeplitzMatrix) and A.nb_blocks == (2, 2): A1, A2 = A._stored_blocks[0, :] return [self.lu_decomp(A1 + A2), self.lu_decomp(A1 - A2)] elif isinstance(A, np.ndarray): return sl.lu_factor(A) else: raise NotImplementedError("Cached LU solver is only implemented for dense matrices and 2×2 BlockSymmetricToeplitzMatrix.")
[docs] def cached_lu_decomp(self, A): if not(A is self.cached_matrix): self.cached_matrix = A LOG.debug(f"Computing and caching LU decomposition") self.cached_decomp = self.lu_decomp(A) else: LOG.debug(f"Using cached LU decomposition") return self.cached_decomp
[docs] def solve_with_decomp(self, decomp, b): """Solve the system using the precomputed LU decomposition. TODO: find a better way to differentiate a LU decomposition (returned as tuple by sl.lu_factor) and a set of LU decomposition (stored as a list in self.lu_decomp). """ if isinstance(decomp, list): # The matrix was a BlockSymmetricToeplitzMatrix b1, b2 = b[:len(b)//2], b[len(b)//2:] x_plus = self.solve_with_decomp(decomp[0], b1 + b2) x_minus = self.solve_with_decomp(decomp[1], b1 - b2) return np.concatenate([x_plus + x_minus, x_plus - x_minus])/2 elif isinstance(decomp, tuple): # The matrix was a np.ndarray return sl.lu_solve(decomp, b) else: raise NotImplementedError("Cached LU solver is only implemented for dense matrices and 2×2 BlockSymmetricToeplitzMatrix.")
# ITERATIVE SOLVER
[docs] class Counter: def __init__(self): self.nb_iter = 0 def __call__(self, *args, **kwargs): self.nb_iter += 1
[docs] def solve_gmres(A, b): LOG.debug(f"Solve with GMRES for {A}.") if LOG.isEnabledFor(logging.INFO): counter = Counter() x, info = ssl.gmres(A, b, atol=1e-6, callback=counter) LOG.info(f"End of GMRES after {counter.nb_iter} iterations.") else: x, info = ssl.gmres(A, b, atol=1e-6) if info > 0: raise RuntimeError(f"No convergence of the GMRES after {info} iterations.\n" "This can be due to overlapping panels or irregular frequencies.\n" "In the latter case, using a direct solver can help (https://github.com/mancellin/capytaine/issues/30).") return x
[docs] def gmres_no_fft(A, b): LOG.debug(f"Solve with GMRES for {A} without using FFT.") x, info = ssl.gmres(A.no_toeplitz() if isinstance(A, BlockMatrix) else A, b, atol=1e-6) if info != 0: LOG.warning(f"No convergence of the GMRES. Error code: {info}") return x
# PRECONDITIONED SOLVER def _block_Jacobi_coarse_corr(A, b, x0, R, RA, AcLU, DLU, diag_shapes, n): """ Performs a single step of the block-Jacobi method with coarse correction. Can be used as a preconditioner for matrix A. Parameters ---------- A: BlockMatrix System matrix b: array System right hand side vector x0: array Initial guess of the solution R: array Coarse space restriction matrix RA: array Precomputed product of R and A AcLU: list LU decomposition data of the coarse system matrix Ac (output of scipy.linalg.lu_factor) DLU: list List of LU decomposition data for the diagonal blocks of A diag_shapes: list List of shapes of diagonal blocks n: integer Size of the coarse problem (e.g. number of bodies simulated) Returns ------- array Action of a step of the method on vector x0 """ # x_ps = x after the pre-smoothing step (block-Jacobi) x_ps = np.zeros(A.shape[0], dtype=complex) # the diagonal blocks of A have already been put to zero in build_matrices # they are not needed anymore q = b - A@x0 # loop over diagonal blocks for kk in range(n): local_slice = slice(sum(diag_shapes[:kk]), sum(diag_shapes[:kk+1])) local_rhs = q[local_slice] local_sol = sl.lu_solve(DLU[kk], local_rhs, check_finite=False) x_ps[local_slice] = local_sol r_c = R@b - RA@x_ps #restricted residual e_c = sl.lu_solve(AcLU, r_c, check_finite=False) # update return x_ps + R.T@e_c
[docs] def solve_precond_gmres(A_and_precond_data, b): """ Implementation of the preconditioner presented in `<https://doi.org/10.1007/978-3-031-50769-4_14>`. """ A, R, RA, AcLU, DLU, diag_shapes, n, PinvA = A_and_precond_data N = A.shape[0] Pinvb = _block_Jacobi_coarse_corr(A, b, np.zeros(N, dtype=complex), R, RA, AcLU, DLU, diag_shapes, n) LOG.debug(f"Solve with GMRES for {A}.") if LOG.isEnabledFor(logging.INFO): counter = Counter() x, info = ssl.gmres(PinvA, Pinvb, atol=1e-6, callback=counter) LOG.info(f"End of GMRES after {counter.nb_iter} iterations.") else: x, info = ssl.gmres(PinvA, Pinvb, atol=1e-6) if info > 0: raise RuntimeError(f"No convergence of the GMRES after {info} iterations.\n" "This can be due to overlapping panels or irregular frequencies.\n" "In the latter case, using a direct solver can help (https://github.com/mancellin/capytaine/issues/30).") return x