Source code for capytaine.bem.solver

#!/usr/bin/env python
# coding: utf-8
"""Solver for the BEM problem.

Example
-------

::

    problem = RadiationProblem(...)
    result = BEMSolver(green_functions=..., engine=...).solve(problem)

"""
# Copyright (C) 2017-2019 Matthieu Ancellin
# See LICENSE file at <https://github.com/mancellin/capytaine>

import logging

import numpy as np

from datetime import datetime

from capytaine.bem.problems_and_results import LinearPotentialFlowProblem
from capytaine.green_functions.delhommeau import Delhommeau
from capytaine.bem.engines import BasicMatrixEngine, HierarchicalToeplitzMatrixEngine
from capytaine.io.xarray import problems_from_dataset, assemble_dataset, kochin_data_array
from capytaine.tools.optional_imports import silently_import_optional_dependency

LOG = logging.getLogger(__name__)

[docs]class BEMSolver: """ Solver for linear potential flow problems. Parameters ---------- green_function: AbstractGreenFunction Object handling the computation of the Green function. engine: MatrixEngine Object handling the building of matrices and the resolution of linear systems with these matrices. Attributes ---------- exportable_settings : dict Settings of the solver that can be saved to reinit the same solver later. """ def __init__(self, *, green_function=Delhommeau(), engine=BasicMatrixEngine()): self.green_function = green_function self.engine = engine try: self.exportable_settings = { **self.green_function.exportable_settings, **self.engine.exportable_settings } except AttributeError: pass
[docs] @classmethod def from_exported_settings(settings): raise NotImplementedError
[docs] def solve(self, problem, keep_details=True): """Solve the linear potential flow problem. Parameters ---------- problem: LinearPotentialFlowProblem the problem to be solved keep_details: bool, optional if True, store the sources and the potential on the floating body in the output object (default: True) Returns ------- LinearPotentialFlowResult an object storing the problem data and its results """ LOG.info("Solve %s.", problem) S, K = self.engine.build_matrices( problem.body.mesh, problem.body.mesh, problem.free_surface, problem.sea_bottom, problem.wavenumber, self.green_function ) sources = self.engine.linear_solver(K, problem.boundary_condition) potential = S @ sources pressure = problem.rho * potential # Actually, for diffraction problems: pressure over jω # for radiation problems: pressure over -ω² # The correction is done in `store_force` in the `result` object. forces = problem.body.integrate_pressure(pressure) if not keep_details: result = problem.make_results_container(forces) else: result = problem.make_results_container(forces, sources, potential, pressure) LOG.debug("Done!") return result
[docs] def solve_all(self, problems, *, n_jobs=1, **kwargs): """Solve several problems. Optional keyword arguments are passed to `BEMSolver.solve`. Parameters ---------- problems: list of LinearPotentialFlowProblem several problems to be solved n_jobs: int, optional (default: 1) the number of jobs to run in parallel using the optional dependency `joblib` By defaults: do not use joblib and solve sequentially. Returns ------- list of LinearPotentialFlowResult the solved problems """ if n_jobs == 1: # force sequential resolution return [self.solve(pb, **kwargs) for pb in sorted(problems)] else: joblib = silently_import_optional_dependency("joblib") if joblib is None: raise ImportError(f"Setting the `n_jobs` argument to {n_jobs} requires the missing optional dependency 'joblib'.") groups_of_problems = LinearPotentialFlowProblem._group_for_parallel_resolution(problems) groups_of_results = joblib.Parallel(n_jobs=n_jobs)(joblib.delayed(self.solve_all)(grp, n_jobs=1, **kwargs) for grp in groups_of_problems) results = [res for grp in groups_of_results for res in grp] # flatten the nested list return results
[docs] def fill_dataset(self, dataset, bodies, *, n_jobs=1, **kwargs): """Solve a set of problems defined by the coordinates of an xarray dataset. Parameters ---------- dataset : xarray Dataset dataset containing the problems parameters: frequency, radiating_dof, water_depth, ... bodies : FloatingBody or list of FloatingBody The body or bodies involved in the problems They should all have different names. n_jobs: int, optional (default: 1) the number of jobs to run in parallel using the optional dependency `joblib` By defaults: do not use joblib and solve sequentially. Returns ------- xarray Dataset """ attrs = {'start_of_computation': datetime.now().isoformat(), **self.exportable_settings} problems = problems_from_dataset(dataset, bodies) if 'theta' in dataset.coords: results = self.solve_all(problems, keep_details=True, n_jobs=n_jobs) kochin = kochin_data_array(results, dataset.coords['theta']) dataset = assemble_dataset(results, attrs=attrs, **kwargs) dataset.update(kochin) else: results = self.solve_all(problems, keep_details=False, n_jobs=n_jobs) dataset = assemble_dataset(results, attrs=attrs, **kwargs) return dataset
[docs] def get_potential_on_mesh(self, result, mesh, chunk_size=50): """Compute the potential on a mesh for the potential field of a previously solved problem. Since the interaction matrix does not need to be computed in full to compute the matrix-vector product, only a few lines are evaluated at a time to reduce the memory cost of the operation. Parameters ---------- result : LinearPotentialFlowResult the return of the BEM solver mesh : Mesh or CollectionOfMeshes a mesh chunk_size: int, optional Number of lines to compute in the matrix. (legacy, should be passed as an engine setting instead). Returns ------- array of shape (mesh.nb_faces,) potential on the faces of the mesh Raises ------ Exception: if the :code:`Result` object given as input does not contain the source distribution. """ LOG.info(f"Compute potential on {mesh.name} for {result}.") if result.sources is None: raise Exception(f"""The values of the sources of {result} cannot been found. They probably have not been stored by the solver because the option keep_details=True have not been set. Please re-run the resolution with this option.""") if chunk_size > mesh.nb_faces: S = self.engine.build_S_matrix( mesh, result.body.mesh, result.free_surface, result.sea_bottom, result.wavenumber, self.green_function ) phi = S @ result.sources else: phi = np.empty((mesh.nb_faces,), dtype=np.complex128) for i in range(0, mesh.nb_faces, chunk_size): faces_to_extract = list(range(i, min(i+chunk_size, mesh.nb_faces))) S = self.engine.build_S_matrix( mesh.extract_faces(faces_to_extract), result.body.mesh, result.free_surface, result.sea_bottom, result.wavenumber, self.green_function ) phi[i:i+chunk_size] = S @ result.sources LOG.debug(f"Done computing potential on {mesh.name} for {result}.") return phi
[docs] def get_free_surface_elevation(self, result, free_surface, keep_details=False): """Compute the elevation of the free surface on a mesh for a previously solved problem. Parameters ---------- result : LinearPotentialFlowResult the return of the solver free_surface : FreeSurface a meshed free surface keep_details : bool, optional if True, keep the free surface elevation in the LinearPotentialFlowResult (default:False) Returns ------- array of shape (free_surface.nb_faces,) the free surface elevation on each faces of the meshed free surface Raises ------ Exception: if the :code:`Result` object given as input does not contain the source distribution. """ fs_elevation = 1j*result.omega/result.g * self.get_potential_on_mesh(result, free_surface.mesh) if keep_details: result.fs_elevation[free_surface] = fs_elevation return fs_elevation
# LEGACY INTERFACE def _arguments(f): """Returns the name of the arguments of the function f""" return f.__code__.co_varnames[:f.__code__.co_argcount]
[docs]class Nemoh(BEMSolver): """Solver for the BEM problem based on Nemoh's Green function. Legacy API. Parameters are dispatched to the Delhommeau class and to the engine (BasicMatrixEngine or HierarchicalToeplitzMatrixEngine). """ def __init__(self, **params): green_function = Delhommeau( **{key: params[key] for key in params if key in _arguments(Delhommeau.__init__)} ) if 'hierarchical_matrices' in params and params['hierarchical_matrices']: engine = HierarchicalToeplitzMatrixEngine( **{key: params[key] for key in params if key in _arguments(HierarchicalToeplitzMatrixEngine.__init__)} ) else: engine = BasicMatrixEngine( **{key: params[key] for key in params if key in _arguments(BasicMatrixEngine.__init__)} ) super().__init__(green_function=green_function, engine=engine)
[docs] def build_matrices(self, *args, **kwargs): """Legacy API.""" args = args + (self.green_function,) return self.engine.build_matrices(*args, **kwargs)