Source code for capytaine.bem.solver

# Copyright (C) 2017-2019 Matthieu Ancellin
# See LICENSE file at <https://github.com/mancellin/capytaine>
"""Solver for the BEM problem.

.. code-block:: python

    problem = RadiationProblem(...)
    result = BEMSolver(green_functions=..., engine=...).solve(problem)

"""

import logging

import numpy as np

from datetime import datetime

from capytaine.bem.problems_and_results import LinearPotentialFlowProblem
from capytaine.green_functions.delhommeau import Delhommeau
from capytaine.bem.engines import BasicMatrixEngine, HierarchicalToeplitzMatrixEngine
from capytaine.io.xarray import problems_from_dataset, assemble_dataset, kochin_data_array
from capytaine.tools.optional_imports import silently_import_optional_dependency
from capytaine.tools.lists_of_points import _normalize_points, _normalize_free_surface_points

LOG = logging.getLogger(__name__)

[docs]class BEMSolver: """ Solver for linear potential flow problems. Parameters ---------- green_function: AbstractGreenFunction, optional Object handling the computation of the Green function. (default: :class:`~capytaine.green_function.delhommeau.Delhommeau`) engine: MatrixEngine, optional Object handling the building of matrices and the resolution of linear systems with these matrices. (default: :class:`~capytaine.bem.engines.BasicMatrixEngine`) Attributes ---------- exportable_settings : dict Settings of the solver that can be saved to reinit the same solver later. """ def __init__(self, *, green_function=None, engine=None): self.green_function = Delhommeau() if green_function is None else green_function self.engine = BasicMatrixEngine() if engine is None else engine try: self.exportable_settings = { **self.green_function.exportable_settings, **self.engine.exportable_settings } except AttributeError: pass def __str__(self): return f"BEMSolver(engine={self.engine}, green_function={self.green_function})" def __repr__(self): return self.__str__() def _repr_pretty_(self, p, cycle): p.text(self.__str__())
[docs] @classmethod def from_exported_settings(settings): raise NotImplementedError
[docs] def solve(self, problem, keep_details=True): """Solve the linear potential flow problem. Parameters ---------- problem: LinearPotentialFlowProblem the problem to be solved keep_details: bool, optional if True, store the sources and the potential on the floating body in the output object (default: True) Returns ------- LinearPotentialFlowResult an object storing the problem data and its results """ LOG.info("Solve %s.", problem) S, K = self.engine.build_matrices( problem.body.mesh, problem.body.mesh, problem.free_surface, problem.water_depth, problem.wavenumber, self.green_function ) sources = self.engine.linear_solver(K, problem.boundary_condition) potential = S @ sources pressure = 1j * problem.omega * problem.rho * potential forces = problem.body.integrate_pressure(pressure) if not keep_details: result = problem.make_results_container(forces) else: result = problem.make_results_container(forces, sources, potential, pressure) LOG.debug("Done!") return result
[docs] def solve_all(self, problems, *, n_jobs=1, **kwargs): """Solve several problems. Optional keyword arguments are passed to `BEMSolver.solve`. Parameters ---------- problems: list of LinearPotentialFlowProblem several problems to be solved n_jobs: int, optional (default: 1) the number of jobs to run in parallel using the optional dependency `joblib` By defaults: do not use joblib and solve sequentially. Returns ------- list of LinearPotentialFlowResult the solved problems """ if n_jobs == 1: # force sequential resolution return [self.solve(pb, **kwargs) for pb in sorted(problems)] else: joblib = silently_import_optional_dependency("joblib") if joblib is None: raise ImportError(f"Setting the `n_jobs` argument to {n_jobs} requires the missing optional dependency 'joblib'.") groups_of_problems = LinearPotentialFlowProblem._group_for_parallel_resolution(problems) groups_of_results = joblib.Parallel(n_jobs=n_jobs)(joblib.delayed(self.solve_all)(grp, n_jobs=1, **kwargs) for grp in groups_of_problems) results = [res for grp in groups_of_results for res in grp] # flatten the nested list return results
[docs] def fill_dataset(self, dataset, bodies, *, n_jobs=1, **kwargs): """Solve a set of problems defined by the coordinates of an xarray dataset. Parameters ---------- dataset : xarray Dataset dataset containing the problems parameters: frequency, radiating_dof, water_depth, ... bodies : FloatingBody or list of FloatingBody The body or bodies involved in the problems They should all have different names. n_jobs: int, optional (default: 1) the number of jobs to run in parallel using the optional dependency `joblib` By defaults: do not use joblib and solve sequentially. Returns ------- xarray Dataset """ attrs = {'start_of_computation': datetime.now().isoformat(), **self.exportable_settings} problems = problems_from_dataset(dataset, bodies) if 'theta' in dataset.coords: results = self.solve_all(problems, keep_details=True, n_jobs=n_jobs) kochin = kochin_data_array(results, dataset.coords['theta']) dataset = assemble_dataset(results, attrs=attrs, **kwargs) dataset.update(kochin) else: results = self.solve_all(problems, keep_details=False, n_jobs=n_jobs) dataset = assemble_dataset(results, attrs=attrs, **kwargs) return dataset
[docs] def compute_potential(self, points, result): """Compute the value of the potential at given points for a previously solved potential flow problem. Parameters ---------- points: array of shape (3,) or (N, 3), or 3-ple of arrays returned by meshgrid, or cpt.Mesh or cpt.CollectionOfMeshes object Coordinates of the point(s) at which the potential should be computed results: LinearPotentialFlowResult The return of the BEM solver Returns ------- complex-valued array of shape (1,) or (N,) or (nx, ny, nz) or (mesh.nb_faces,) depending of the kind of input The value of the potential at the points Raises ------ Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution. """ points, output_shape = _normalize_points(points, keep_mesh=True) if result.sources is None: raise Exception(f"""The values of the sources of {result} cannot been found. They probably have not been stored by the solver because the option keep_details=True have not been set. Please re-run the resolution with this option.""") S, _ = self.green_function.evaluate(points, result.body.mesh, result.free_surface, result.water_depth, result.wavenumber) potential = S @ result.sources # Sum the contributions of all panels in the mesh return potential.reshape(output_shape)
[docs] def compute_velocity(self, points, result): """Compute the value of the velocity vector at given points for a previously solved potential flow problem. Parameters ---------- points: array of shape (3,) or (N, 3), or 3-ple of arrays returned by meshgrid, or cpt.Mesh or cpt.CollectionOfMeshes object Coordinates of the point(s) at which the velocity should be computed results: LinearPotentialFlowResult The return of the BEM solver Returns ------- complex-valued array of shape (3,) or (N,, 3) or (nx, ny, nz, 3) or (mesh.nb_faces, 3) depending of the kind of input The value of the velocity at the points Raises ------ Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution. """ points, output_shape = _normalize_points(points, keep_mesh=True) if result.sources is None: raise Exception(f"""The values of the sources of {result} cannot been found. They probably have not been stored by the solver because the option keep_details=True have not been set. Please re-run the resolution with this option.""") _, gradG = self.green_function.evaluate(points, result.body.mesh, result.free_surface, result.water_depth, result.wavenumber, early_dot_product=False) velocities = np.einsum('ijk,j->ik', gradG, result.sources) # Sum the contributions of all panels in the mesh return velocities.reshape((*output_shape, 3))
[docs] def compute_pressure(self, points, result): """Compute the value of the pressure at given points for a previously solved potential flow problem. Parameters ---------- points: array of shape (3,) or (N, 3), or 3-ple of arrays returned by meshgrid, or cpt.Mesh or cpt.CollectionOfMeshes object Coordinates of the point(s) at which the pressure should be computed results: LinearPotentialFlowResult The return of the BEM solver Returns ------- complex-valued array of shape (1,) or (N,) or (nx, ny, nz) or (mesh.nb_faces,) depending of the kind of input The value of the pressure at the points Raises ------ Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution. """ return 1j * result.omega * result.rho * self.compute_potential(points, results)
[docs] def compute_free_surface_elevation(self, points, result): """Compute the value of the free surface elevation at given points for a previously solved potential flow problem. Parameters ---------- points: array of shape (2,) or (N, 2), or 2-ple of arrays returned by meshgrid, or cpt.Mesh or cpt.CollectionOfMeshes object Coordinates of the point(s) at which the free surface elevation should be computed results: LinearPotentialFlowResult The return of the BEM solver Returns ------- complex-valued array of shape (1,) or (N,) or (nx, ny, nz) or (mesh.nb_faces,) depending of the kind of input The value of the free surface elevation at the points Raises ------ Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution. """ points, output_shape = _normalize_free_surface_points(points, keep_mesh=True) fs_elevation = 1j*result.omega/result.g * self.compute_potential(points, result) return fs_elevation.reshape(output_shape)
## Legacy
[docs] def get_potential_on_mesh(self, result, mesh, chunk_size=50): """Compute the potential on a mesh for the potential field of a previously solved problem. Since the interaction matrix does not need to be computed in full to compute the matrix-vector product, only a few lines are evaluated at a time to reduce the memory cost of the operation. The newer method :code:`compute_potential` should be prefered in the future. Parameters ---------- result : LinearPotentialFlowResult the return of the BEM solver mesh : Mesh or CollectionOfMeshes a mesh chunk_size: int, optional Number of lines to compute in the matrix. (legacy, should be passed as an engine setting instead). Returns ------- array of shape (mesh.nb_faces,) potential on the faces of the mesh Raises ------ Exception: if the :code:`Result` object given as input does not contain the source distribution. """ LOG.info(f"Compute potential on {mesh.name} for {result}.") if result.sources is None: raise Exception(f"""The values of the sources of {result} cannot been found. They probably have not been stored by the solver because the option keep_details=True have not been set. Please re-run the resolution with this option.""") if chunk_size > mesh.nb_faces: S = self.engine.build_S_matrix( mesh, result.body.mesh, result.free_surface, result.water_depth, result.wavenumber, self.green_function ) phi = S @ result.sources else: phi = np.empty((mesh.nb_faces,), dtype=np.complex128) for i in range(0, mesh.nb_faces, chunk_size): faces_to_extract = list(range(i, min(i+chunk_size, mesh.nb_faces))) S = self.engine.build_S_matrix( mesh.extract_faces(faces_to_extract), result.body.mesh, result.free_surface, result.water_depth, result.wavenumber, self.green_function ) phi[i:i+chunk_size] = S @ result.sources LOG.debug(f"Done computing potential on {mesh.name} for {result}.") return phi
[docs] def get_free_surface_elevation(self, result, free_surface, keep_details=False): """Compute the elevation of the free surface on a mesh for a previously solved problem. The newer method :code:`compute_free_surface_elevation` should be prefered in the future. Parameters ---------- result : LinearPotentialFlowResult the return of the solver free_surface : FreeSurface a meshed free surface keep_details : bool, optional if True, keep the free surface elevation in the LinearPotentialFlowResult (default:False) Returns ------- array of shape (free_surface.nb_faces,) the free surface elevation on each faces of the meshed free surface Raises ------ Exception: if the :code:`Result` object given as input does not contain the source distribution. """ fs_elevation = 1j*result.omega/result.g * self.get_potential_on_mesh(result, free_surface.mesh) if keep_details: result.fs_elevation[free_surface] = fs_elevation return fs_elevation