# Copyright (C) 2017-2024 Matthieu Ancellin
# See LICENSE file at <https://github.com/capytaine/capytaine>
"""Solver for the BEM problem.
.. code-block:: python
problem = RadiationProblem(...)
result = BEMSolver(green_functions=..., engine=...).solve(problem)
"""
import logging
import numpy as np
from datetime import datetime
from rich.progress import track
from capytaine.bem.problems_and_results import LinearPotentialFlowProblem
from capytaine.green_functions.delhommeau import Delhommeau
from capytaine.bem.engines import BasicMatrixEngine
from capytaine.io.xarray import problems_from_dataset, assemble_dataset, kochin_data_array
from capytaine.tools.optional_imports import silently_import_optional_dependency
from capytaine.tools.lists_of_points import _normalize_points, _normalize_free_surface_points
from capytaine.tools.symbolic_multiplication import supporting_symbolic_multiplication
LOG = logging.getLogger(__name__)
[docs]
class BEMSolver:
"""
Solver for linear potential flow problems.
Parameters
----------
green_function: AbstractGreenFunction, optional
Object handling the computation of the Green function.
(default: :class:`~capytaine.green_function.delhommeau.Delhommeau`)
engine: MatrixEngine, optional
Object handling the building of matrices and the resolution of linear systems with these matrices.
(default: :class:`~capytaine.bem.engines.BasicMatrixEngine`)
Attributes
----------
exportable_settings : dict
Settings of the solver that can be saved to reinit the same solver later.
"""
def __init__(self, *, green_function=None, engine=None):
self.green_function = Delhommeau() if green_function is None else green_function
self.engine = BasicMatrixEngine() if engine is None else engine
try:
self.exportable_settings = {
**self.green_function.exportable_settings,
**self.engine.exportable_settings
}
except AttributeError:
pass
def __str__(self):
return f"BEMSolver(engine={self.engine}, green_function={self.green_function})"
def __repr__(self):
return self.__str__()
def _repr_pretty_(self, p, cycle):
p.text(self.__str__())
[docs]
@classmethod
def from_exported_settings(settings):
raise NotImplementedError
[docs]
def solve(self, problem, method='indirect', keep_details=True, _check_wavelength=True):
"""Solve the linear potential flow problem.
Parameters
----------
problem: LinearPotentialFlowProblem
the problem to be solved
method: string, optional
select boundary integral approach indirect (i.e.Nemoh)/direct (i.e.WAMIT) (default: indirect)
keep_details: bool, optional
if True, store the sources and the potential on the floating body in the output object
(default: True)
_check_wavelength: bool, optional
if True, check the mesh resolution with respect to the wavelength
Returns
-------
LinearPotentialFlowResult
an object storing the problem data and its results
"""
LOG.info("Solve %s.", problem)
if _check_wavelength:
self._check_wavelength_and_mesh_resolution([problem])
self._check_wavelength_and_irregular_frequencies([problem])
if problem.forward_speed != 0.0:
omega, wavenumber = problem.encounter_omega, problem.encounter_wavenumber
else:
omega, wavenumber = problem.omega, problem.wavenumber
linear_solver = supporting_symbolic_multiplication(self.engine.linear_solver)
if (method == 'direct'):
if problem.forward_speed != 0.0:
raise NotImplementedError("Direct solver is not able to solve problems with forward speed.")
S, D = self.engine.build_matrices(
problem.body.mesh_including_lid, problem.body.mesh_including_lid,
problem.free_surface, problem.water_depth, wavenumber,
self.green_function, adjoint_double_layer=False
)
potential = linear_solver(D, S @ problem.boundary_condition)
pressure = 1j * omega * problem.rho * potential
sources = None
else:
S, K = self.engine.build_matrices(
problem.body.mesh_including_lid, problem.body.mesh_including_lid,
problem.free_surface, problem.water_depth, wavenumber,
self.green_function, adjoint_double_layer=True
)
sources = linear_solver(K, problem.boundary_condition)
potential = S @ sources
pressure = 1j * omega * problem.rho * potential
if problem.forward_speed != 0.0:
result = problem.make_results_container(sources=sources)
# Temporary result object to compute the ∇Φ term
nabla_phi = self._compute_potential_gradient(problem.body.mesh_including_lid, result)
pressure += problem.rho * problem.forward_speed * nabla_phi[:, 0]
pressure_on_hull = pressure[:problem.body.mesh.nb_faces] # Discards pressure on lid if any
forces = problem.body.integrate_pressure(pressure_on_hull)
if not keep_details:
result = problem.make_results_container(forces)
else:
result = problem.make_results_container(forces, sources, potential, pressure)
LOG.debug("Done!")
return result
[docs]
def solve_all(self, problems, *, method='indirect', n_jobs=1, progress_bar=True, _check_wavelength=True, **kwargs):
"""Solve several problems.
Optional keyword arguments are passed to `BEMSolver.solve`.
Parameters
----------
problems: list of LinearPotentialFlowProblem
several problems to be solved
method: string, optional
select boundary integral approach indirect (i.e.Nemoh)/direct (i.e.WAMIT) (default: indirect)
n_jobs: int, optional (default: 1)
the number of jobs to run in parallel using the optional dependency `joblib`
By defaults: do not use joblib and solve sequentially.
progress_bar: bool, optional (default: True)
Display a progress bar while solving
Returns
-------
list of LinearPotentialFlowResult
the solved problems
"""
if _check_wavelength:
self._check_wavelength_and_mesh_resolution(problems)
self._check_wavelength_and_irregular_frequencies(problems)
if n_jobs == 1: # force sequential resolution
problems = sorted(problems)
if progress_bar:
problems = track(problems, total=len(problems), description="Solving BEM problems")
return [self.solve(pb, method=method, _check_wavelength=False, **kwargs) for pb in problems]
else:
joblib = silently_import_optional_dependency("joblib")
if joblib is None:
raise ImportError(f"Setting the `n_jobs` argument to {n_jobs} requires the missing optional dependency 'joblib'.")
groups_of_problems = LinearPotentialFlowProblem._group_for_parallel_resolution(problems)
parallel = joblib.Parallel(return_as="generator", n_jobs=n_jobs)
groups_of_results = parallel(joblib.delayed(self.solve_all)(grp, method=method, n_jobs=1, progress_bar=False, _check_wavelength=False, **kwargs) for grp in groups_of_problems)
if progress_bar:
groups_of_results = track(groups_of_results,
total=len(groups_of_problems),
description=f"Solving BEM problems with {n_jobs} threads:")
results = [res for grp in groups_of_results for res in grp] # flatten the nested list
return results
@staticmethod
def _check_wavelength_and_mesh_resolution(problems):
"""Display a warning if some of the problems have a mesh resolution
that might not be sufficient for the given wavelength."""
risky_problems = [pb for pb in problems
if 0.0 < pb.wavelength < pb.body.minimal_computable_wavelength]
nb_risky_problems = len(risky_problems)
if nb_risky_problems == 1:
pb = risky_problems[0]
freq_type = risky_problems[0].provided_freq_type
freq = pb.__getattribute__(freq_type)
LOG.warning(f"Mesh resolution for {pb}:\n"
f"The resolution of the mesh of the body {pb.body.__short_str__()} might "
f"be insufficient for {freq_type}={freq}.\n"
"This warning appears because the largest panel of this mesh "
f"has radius {pb.body.mesh.faces_radiuses.max():.3f} > wavelength/8."
)
elif nb_risky_problems > 1:
freq_type = risky_problems[0].provided_freq_type
freqs = np.array([float(pb.__getattribute__(freq_type)) for pb in risky_problems])
LOG.warning(f"Mesh resolution for {nb_risky_problems} problems:\n"
"The resolution of the mesh might be insufficient "
f"for {freq_type} ranging from {freqs.min():.3f} to {freqs.max():.3f}.\n"
"This warning appears when the largest panel of this mesh "
"has radius > wavelength/8."
)
@staticmethod
def _check_wavelength_and_irregular_frequencies(problems):
"""Display a warning if some of the problems might encounter irregular frequencies."""
risky_problems = [pb for pb in problems
if pb.body.first_irregular_frequency_estimate() < pb.omega < np.inf]
nb_risky_problems = len(risky_problems)
if nb_risky_problems >= 1:
if any(pb.body.lid_mesh is None for pb in problems):
recommendation = "Setting a lid for the floating body is recommended."
else:
recommendation = "The lid might need to be closer to the free surface."
if nb_risky_problems == 1:
pb = risky_problems[0]
freq_type = risky_problems[0].provided_freq_type
freq = pb.__getattribute__(freq_type)
LOG.warning(f"Irregular frequencies for {pb}:\n"
f"The body {pb.body.__short_str__()} might display irregular frequencies "
f"for {freq_type}={freq}.\n"
+ recommendation
)
elif nb_risky_problems > 1:
freq_type = risky_problems[0].provided_freq_type
freqs = np.array([float(pb.__getattribute__(freq_type)) for pb in risky_problems])
LOG.warning(f"Irregular frequencies for {nb_risky_problems} problems:\n"
"Irregular frequencies might be encountered "
f"for {freq_type} ranging from {freqs.min():.3f} to {freqs.max():.3f}.\n"
+ recommendation
)
[docs]
def fill_dataset(self, dataset, bodies, *, method='indirect', n_jobs=1, **kwargs):
"""Solve a set of problems defined by the coordinates of an xarray dataset.
Parameters
----------
dataset : xarray Dataset
dataset containing the problems parameters: frequency, radiating_dof, water_depth, ...
bodies : FloatingBody or list of FloatingBody
The body or bodies involved in the problems
They should all have different names.
method: string, optional
select boundary integral approach indirect (i.e.Nemoh)/direct (i.e.WAMIT) (default: indirect)
n_jobs: int, optional (default: 1)
the number of jobs to run in parallel using the optional dependency `joblib`
By defaults: do not use joblib and solve sequentially.
progress_bar: bool, optional (default: True)
Display a progress bar while solving
Returns
-------
xarray Dataset
"""
attrs = {'start_of_computation': datetime.now().isoformat(),
**self.exportable_settings}
problems = problems_from_dataset(dataset, bodies)
if 'theta' in dataset.coords:
results = self.solve_all(problems, keep_details=True, method=method, n_jobs=n_jobs)
kochin = kochin_data_array(results, dataset.coords['theta'])
dataset = assemble_dataset(results, attrs=attrs, **kwargs)
dataset.update(kochin)
else:
results = self.solve_all(problems, keep_details=False, method=method, n_jobs=n_jobs)
dataset = assemble_dataset(results, attrs=attrs, **kwargs)
return dataset
[docs]
def compute_potential(self, points, result):
"""Compute the value of the potential at given points for a previously solved potential flow problem.
Parameters
----------
points: array of shape (3,) or (N, 3), or 3-ple of arrays returned by meshgrid, or cpt.Mesh or cpt.CollectionOfMeshes object
Coordinates of the point(s) at which the potential should be computed
result: LinearPotentialFlowResult
The return of the BEM solver
Returns
-------
complex-valued array of shape (1,) or (N,) or (nx, ny, nz) or (mesh.nb_faces,) depending of the kind of input
The value of the potential at the points
Raises
------
Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution.
"""
points, output_shape = _normalize_points(points, keep_mesh=True)
if result.sources is None:
raise Exception(f"""The values of the sources of {result} cannot been found.
They probably have not been stored by the solver because the option keep_details=True have not been set or the direct method has been used.
Please re-run the resolution with the indirect method and keep_details=True.""")
S, _ = self.green_function.evaluate(points, result.body.mesh_including_lid, result.free_surface, result.water_depth, result.encounter_wavenumber)
potential = S @ result.sources # Sum the contributions of all panels in the mesh
return potential.reshape(output_shape)
def _compute_potential_gradient(self, points, result):
points, output_shape = _normalize_points(points, keep_mesh=True)
if result.sources is None:
raise Exception(f"""The values of the sources of {result} cannot been found.
They probably have not been stored by the solver because the option keep_details=True have not been set.
Please re-run the resolution with this option.""")
_, gradG = self.green_function.evaluate(points, result.body.mesh_including_lid, result.free_surface, result.water_depth, result.encounter_wavenumber,
early_dot_product=False)
velocities = np.einsum('ijk,j->ik', gradG, result.sources) # Sum the contributions of all panels in the mesh
return velocities.reshape((*output_shape, 3))
[docs]
def compute_velocity(self, points, result):
"""Compute the value of the velocity vector at given points for a previously solved potential flow problem.
Parameters
----------
points: array of shape (3,) or (N, 3), or 3-ple of arrays returned by meshgrid, or cpt.Mesh or cpt.CollectionOfMeshes object
Coordinates of the point(s) at which the velocity should be computed
result: LinearPotentialFlowResult
The return of the BEM solver
Returns
-------
complex-valued array of shape (3,) or (N,, 3) or (nx, ny, nz, 3) or (mesh.nb_faces, 3) depending of the kind of input
The value of the velocity at the points
Raises
------
Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution.
"""
nabla_phi = self._compute_potential_gradient(points, result)
if result.forward_speed != 0.0:
nabla_phi[..., 0] -= result.forward_speed
return nabla_phi
[docs]
def compute_pressure(self, points, result):
"""Compute the value of the pressure at given points for a previously solved potential flow problem.
Parameters
----------
points: array of shape (3,) or (N, 3), or 3-ple of arrays returned by meshgrid, or cpt.Mesh or cpt.CollectionOfMeshes object
Coordinates of the point(s) at which the pressure should be computed
result: LinearPotentialFlowResult
The return of the BEM solver
Returns
-------
complex-valued array of shape (1,) or (N,) or (nx, ny, nz) or (mesh.nb_faces,) depending of the kind of input
The value of the pressure at the points
Raises
------
Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution.
"""
if result.forward_speed != 0:
pressure = 1j * result.encounter_omega * result.rho * self.compute_potential(points, result)
nabla_phi = self._compute_potential_gradient(points, result)
pressure += result.rho * result.forward_speed * nabla_phi[..., 0]
else:
pressure = 1j * result.omega * result.rho * self.compute_potential(points, result)
return pressure
[docs]
def compute_free_surface_elevation(self, points, result):
"""Compute the value of the free surface elevation at given points for a previously solved potential flow problem.
Parameters
----------
points: array of shape (2,) or (N, 2), or 2-ple of arrays returned by meshgrid, or cpt.Mesh or cpt.CollectionOfMeshes object
Coordinates of the point(s) at which the free surface elevation should be computed
result: LinearPotentialFlowResult
The return of the BEM solver
Returns
-------
complex-valued array of shape (1,) or (N,) or (nx, ny, nz) or (mesh.nb_faces,) depending of the kind of input
The value of the free surface elevation at the points
Raises
------
Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution.
"""
points, output_shape = _normalize_free_surface_points(points, keep_mesh=True)
if result.forward_speed != 0:
fs_elevation = -1/result.g * (-1j*result.encounter_omega) * self.compute_potential(points, result)
nabla_phi = self._compute_potential_gradient(points, result)
fs_elevation += -1/result.g * result.forward_speed * nabla_phi[..., 0]
else:
fs_elevation = -1/result.g * (-1j*result.omega) * self.compute_potential(points, result)
return fs_elevation.reshape(output_shape)
## Legacy
[docs]
def get_potential_on_mesh(self, result, mesh, chunk_size=50):
"""Compute the potential on a mesh for the potential field of a previously solved problem.
Since the interaction matrix does not need to be computed in full to compute the matrix-vector product,
only a few lines are evaluated at a time to reduce the memory cost of the operation.
The newer method :code:`compute_potential` should be preferred in the future.
Parameters
----------
result : LinearPotentialFlowResult
the return of the BEM solver
mesh : Mesh or CollectionOfMeshes
a mesh
chunk_size: int, optional
Number of lines to compute in the matrix.
(legacy, should be passed as an engine setting instead).
Returns
-------
array of shape (mesh.nb_faces,)
potential on the faces of the mesh
Raises
------
Exception: if the :code:`Result` object given as input does not contain the source distribution.
"""
LOG.info(f"Compute potential on {mesh.name} for {result}.")
if result.sources is None:
raise Exception(f"""The values of the sources of {result} cannot been found.
They probably have not been stored by the solver because the option keep_details=True have not been set or the direct method has been used.
Please re-run the resolution with the indirect method and keep_details=True.""")
if chunk_size > mesh.nb_faces:
S = self.engine.build_S_matrix(
mesh,
result.body.mesh_including_lid,
result.free_surface, result.water_depth, result.wavenumber,
self.green_function
)
phi = S @ result.sources
else:
phi = np.empty((mesh.nb_faces,), dtype=np.complex128)
for i in range(0, mesh.nb_faces, chunk_size):
faces_to_extract = list(range(i, min(i+chunk_size, mesh.nb_faces)))
S = self.engine.build_S_matrix(
mesh.extract_faces(faces_to_extract),
result.body.mesh_including_lid,
result.free_surface, result.water_depth, result.wavenumber,
self.green_function
)
phi[i:i+chunk_size] = S @ result.sources
LOG.debug(f"Done computing potential on {mesh.name} for {result}.")
return phi
[docs]
def get_free_surface_elevation(self, result, free_surface, keep_details=False):
"""Compute the elevation of the free surface on a mesh for a previously solved problem.
The newer method :code:`compute_free_surface_elevation` should be preferred in the future.
Parameters
----------
result : LinearPotentialFlowResult
the return of the solver
free_surface : FreeSurface
a meshed free surface
keep_details : bool, optional
if True, keep the free surface elevation in the LinearPotentialFlowResult (default:False)
Returns
-------
array of shape (free_surface.nb_faces,)
the free surface elevation on each faces of the meshed free surface
Raises
------
Exception: if the :code:`Result` object given as input does not contain the source distribution.
"""
if result.forward_speed != 0.0:
raise NotImplementedError("For free surface elevation with forward speed, please use the `compute_free_surface_elevation` method.")
fs_elevation = 1j*result.omega/result.g * self.get_potential_on_mesh(result, free_surface.mesh)
if keep_details:
result.fs_elevation[free_surface] = fs_elevation
return fs_elevation