#!/usr/bin/env python
# coding: utf-8
"""Definition of the methods to build influence matrices, using possibly some sparse structures."""
# Copyright (C) 2017-2019 Matthieu Ancellin
# See LICENSE file at <https://github.com/mancellin/capytaine>
import logging
from abc import ABC, abstractmethod
import numpy as np
from capytaine.meshes.collections import CollectionOfMeshes
from capytaine.meshes.symmetric import ReflectionSymmetricMesh, TranslationalSymmetricMesh, AxialSymmetricMesh
from capytaine.matrices import linear_solvers
from capytaine.matrices.block import BlockMatrix
from capytaine.matrices.low_rank import LowRankMatrix, NoConvergenceOfACA
from capytaine.matrices.block_toeplitz import BlockSymmetricToeplitzMatrix, BlockToeplitzMatrix, BlockCirculantMatrix
from capytaine.tools.lru_cache import delete_first_lru_cache
LOG = logging.getLogger(__name__)
####################
# ABSTRACT CLASS #
####################
[docs]class MatrixEngine(ABC):
"""Abstract method to build a matrix."""
[docs] @abstractmethod
def build_matrices(self, mesh1, mesh2, free_surface, water_depth, wavenumber, green_function):
pass
[docs] def build_S_matrix(self, *args, **kwargs):
"""Similar to :code:`build_matrices`, but returning only :math:`S`"""
S, _ = self.build_matrices(*args, **kwargs) # Could be optimized...
return S
##################
# BASIC ENGINE #
##################
[docs]class BasicMatrixEngine(MatrixEngine):
"""
Simple engine that assemble a full matrix (except for one reflection symmetry).
Basically only calls :code:`green_function.evaluate`.
Parameters
----------
linear_solver: str or function, optional
Setting of the numerical solver for linear problems Ax = b.
It can be set with the name of a preexisting solver
(available: "direct" and "gmres", the former is the default choice)
or by passing directly a solver function.
matrix_cache_size: int, optional
number of matrices to keep in cache
"""
available_linear_solvers = {'direct': linear_solvers.solve_directly,
'lu_decomposition': linear_solvers.LUSolverWithCache().solve,
'gmres': linear_solvers.solve_gmres,
}
def __init__(self, *, linear_solver='lu_decomposition', matrix_cache_size=1):
if linear_solver in self.available_linear_solvers:
self.linear_solver = self.available_linear_solvers[linear_solver]
else:
self.linear_solver = linear_solver
if matrix_cache_size > 0:
self.build_matrices = delete_first_lru_cache(maxsize=matrix_cache_size)(self.build_matrices)
self.exportable_settings = {
'engine': 'BasicMatrixEngine',
'matrix_cache_size': matrix_cache_size,
'linear_solver': str(linear_solver),
}
def __str__(self):
params = f"linear_solver=\'{self.exportable_settings['linear_solver']}\'"
params += f", matrix_cache_size={self.exportable_settings['matrix_cache_size']}" if self.exportable_settings['matrix_cache_size'] != 1 else ""
return f"BasicMatrixEngine({params})"
def __repr__(self):
return self.__str__()
def _repr_pretty_(self, p, cycle):
p.text(self.__str__())
[docs] def build_matrices(self, mesh1, mesh2, free_surface, water_depth, wavenumber, green_function):
r"""Build the influence matrices between mesh1 and mesh2.
Parameters
----------
mesh1: Mesh or CollectionOfMeshes
mesh of the receiving body (where the potential is measured)
mesh2: Mesh or CollectionOfMeshes
mesh of the source body (over which the source distribution is integrated)
free_surface: float
position of the free surface (default: :math:`z = 0`)
water_depth: float
position of the sea bottom (default: :math:`z = -\infty`)
wavenumber: float
wavenumber (default: 1.0)
green_function: AbstractGreenFunction
object with an "evaluate" method that computes the Green function.
Returns
-------
tuple of matrix-like
the matrices :math:`S` and :math:`K`
"""
if (isinstance(mesh1, ReflectionSymmetricMesh)
and isinstance(mesh2, ReflectionSymmetricMesh)
and mesh1.plane == mesh2.plane):
S_a, V_a = self.build_matrices(
mesh1[0], mesh2[0], free_surface, water_depth, wavenumber,
green_function)
S_b, V_b = self.build_matrices(
mesh1[0], mesh2[1], free_surface, water_depth, wavenumber,
green_function)
return BlockSymmetricToeplitzMatrix([[S_a, S_b]]), BlockSymmetricToeplitzMatrix([[V_a, V_b]])
else:
return green_function.evaluate(
mesh1, mesh2, free_surface, water_depth, wavenumber,
)
###################################
# HIERARCHIAL TOEPLITZ MATRICES #
###################################
[docs]class HierarchicalToeplitzMatrixEngine(MatrixEngine):
"""An experimental matrix engine that build a hierarchical matrix with
some block-Toeplitz structure.
Parameters
----------
ACA_distance: float, optional
Above this distance, the ACA is used to approximate the matrix with a low-rank block.
ACA_tol: float, optional
The tolerance of the ACA when building a low-rank matrix.
matrix_cache_size: int, optional
number of matrices to keep in cache
"""
def __init__(self, *, ACA_distance=8.0, ACA_tol=1e-2, matrix_cache_size=1):
if matrix_cache_size > 0:
self.build_matrices = delete_first_lru_cache(maxsize=matrix_cache_size)(self.build_matrices)
self.ACA_distance = ACA_distance
self.ACA_tol = ACA_tol
self.linear_solver = linear_solvers.solve_gmres
self.exportable_settings = {
'engine': 'HierarchicalToeplitzMatrixEngine',
'ACA_distance': ACA_distance,
'ACA_tol': ACA_tol,
'matrix_cache_size': matrix_cache_size,
}
def __str__(self):
params = f"ACA_distance={self.ACA_distance}"
params += f", ACA_tol={self.ACA_tol}"
params += f", matrix_cache_size={self.exportable_settings['matrix_cache_size']}" if self.exportable_settings['matrix_cache_size'] != 1 else ""
return f"HierarchicalToeplitzMatrixEngine({params})"
def _repr_pretty_(self, p, cycle):
p.text(self.__str__())
[docs] def build_matrices(self,
mesh1, mesh2, free_surface, water_depth, wavenumber, green_function,
_rec_depth=1):
"""Recursively builds a hierarchical matrix between mesh1 and mesh2.
Same arguments as :func:`BasicMatrixEngine.build_matrices`.
:code:`_rec_depth` keeps track of the recursion depth only for pretty log printing.
"""
if logging.getLogger().isEnabledFor(logging.DEBUG):
log_entry = (
"\t" * (_rec_depth+1) +
"Build the S and K influence matrices between {mesh1} and {mesh2}"
.format(mesh1=mesh1.name, mesh2=(mesh2.name if mesh2 is not mesh1 else 'itself'))
)
else:
log_entry = "" # will not be used
# Distance between the meshes (for ACA).
distance = np.linalg.norm(mesh1.center_of_mass_of_nodes - mesh2.center_of_mass_of_nodes)
# I) SPARSE COMPUTATION
# I-i) BLOCK TOEPLITZ MATRIX
if (isinstance(mesh1, ReflectionSymmetricMesh)
and isinstance(mesh2, ReflectionSymmetricMesh)
and mesh1.plane == mesh2.plane):
LOG.debug(log_entry + " using mirror symmetry.")
S_a, V_a = self.build_matrices(
mesh1[0], mesh2[0], free_surface, water_depth, wavenumber, green_function,
_rec_depth=_rec_depth+1)
S_b, V_b = self.build_matrices(
mesh1[0], mesh2[1], free_surface, water_depth, wavenumber, green_function,
_rec_depth=_rec_depth+1)
return BlockSymmetricToeplitzMatrix([[S_a, S_b]]), BlockSymmetricToeplitzMatrix([[V_a, V_b]])
elif (isinstance(mesh1, TranslationalSymmetricMesh)
and isinstance(mesh2, TranslationalSymmetricMesh)
and np.allclose(mesh1.translation, mesh2.translation)
and mesh1.nb_submeshes == mesh2.nb_submeshes):
LOG.debug(log_entry + " using translational symmetry.")
S_list, V_list = [], []
for submesh in mesh2:
S, V = self.build_matrices(
mesh1[0], submesh, free_surface, water_depth, wavenumber, green_function,
_rec_depth=_rec_depth+1)
S_list.append(S)
V_list.append(V)
for submesh in mesh1[1:][::-1]:
S, V = self.build_matrices(
submesh, mesh2[0], free_surface, water_depth, wavenumber, green_function,
_rec_depth=_rec_depth+1)
S_list.append(S)
V_list.append(V)
return BlockToeplitzMatrix([S_list]), BlockToeplitzMatrix([V_list])
elif (isinstance(mesh1, AxialSymmetricMesh)
and isinstance(mesh2, AxialSymmetricMesh)
and mesh1.axis == mesh2.axis
and mesh1.nb_submeshes == mesh2.nb_submeshes):
LOG.debug(log_entry + " using rotation symmetry.")
S_line, V_line = [], []
for submesh in mesh2[:mesh2.nb_submeshes]:
S, V = self.build_matrices(
mesh1[0], submesh, free_surface, water_depth, wavenumber, green_function,
_rec_depth=_rec_depth+1)
S_line.append(S)
V_line.append(V)
return BlockCirculantMatrix([S_line]), BlockCirculantMatrix([V_line])
# I-ii) LOW-RANK MATRIX WITH ACA
elif distance > self.ACA_distance*mesh1.diameter_of_nodes or distance > self.ACA_distance*mesh2.diameter_of_nodes:
LOG.debug(log_entry + " using ACA.")
def get_row_func(i):
s, v = green_function.evaluate(
mesh1.extract_one_face(i), mesh2,
free_surface, water_depth, wavenumber
)
return s.flatten(), v.flatten()
def get_col_func(j):
s, v = green_function.evaluate(
mesh1, mesh2.extract_one_face(j),
free_surface, water_depth, wavenumber
)
return s.flatten(), v.flatten()
try:
return LowRankMatrix.from_rows_and_cols_functions_with_multi_ACA(
get_row_func, get_col_func, mesh1.nb_faces, mesh2.nb_faces,
nb_matrices=2, id_main=1, # Approximate V and get an approximation of S at the same time
tol=self.ACA_tol, dtype=np.complex128)
except NoConvergenceOfACA:
pass # Continue with non sparse computation
# II) NON-SPARSE COMPUTATIONS
# II-i) BLOCK MATRIX
if (isinstance(mesh1, CollectionOfMeshes)
and isinstance(mesh2, CollectionOfMeshes)):
LOG.debug(log_entry + " using block matrix structure.")
S_matrix, V_matrix = [], []
for submesh1 in mesh1:
S_line, V_line = [], []
for submesh2 in mesh2:
S, V = self.build_matrices(
submesh1, submesh2, free_surface, water_depth, wavenumber, green_function,
_rec_depth=_rec_depth+1)
S_line.append(S)
V_line.append(V)
S_matrix.append(S_line)
V_matrix.append(V_line)
return BlockMatrix(S_matrix), BlockMatrix(V_matrix)
# II-ii) PLAIN NUMPY ARRAY
else:
LOG.debug(log_entry)
S, V = green_function.evaluate(
mesh1, mesh2, free_surface, water_depth, wavenumber,
)
return S, V