Source code for capytaine.io.xarray

"""Tools to use xarray Datasets as inputs and outputs.

.. todo:: This module could be tidied up a bit and some methods merged or
          uniformized.
"""
# Copyright (C) 2017-2019 Matthieu Ancellin
# See LICENSE file at <https://github.com/mancellin/capytaine>

import logging
from datetime import datetime
from itertools import product
from collections import Counter
from typing import Sequence, List, Union

import numpy as np
import pandas as pd
import xarray as xr

from capytaine import __version__
from capytaine.bodies.bodies import FloatingBody
from capytaine.bem.problems_and_results import (
    LinearPotentialFlowProblem, DiffractionProblem, RadiationProblem,
    LinearPotentialFlowResult, _default_parameters)
from capytaine.post_pro.kochin import compute_kochin
from capytaine.io.bemio import dataframe_from_bemio


LOG = logging.getLogger(__name__)


#########################
#  Reading test matrix  #
#########################

[docs] def problems_from_dataset(dataset: xr.Dataset, bodies: Union[FloatingBody, Sequence[FloatingBody]], ) -> List[LinearPotentialFlowProblem]: """Generate a list of problems from a test matrix. Parameters ---------- dataset : xarray Dataset Test matrix containing the problems parameters. bodies : FloatingBody or list of FloatingBody The bodies on which the computations of the test matrix will be applied. They should all have different names. Returns ------- list of LinearPotentialFlowProblem Raises ------ ValueError if required fields are missing in the dataset """ if isinstance(bodies, FloatingBody): bodies = [bodies] # Should be done before looking for `frequency_keys`, otherwise # frequencies provided as a scalar dimension will be skipped. dataset = _unsqueeze_dimensions(dataset) # SANITY CHECKS assert len(list(set(body.name for body in bodies))) == len(bodies), \ "All bodies should have different names." # Warn user in case of key with unrecognized name (e.g. misspells) keys_in_dataset = set(dataset.dims) accepted_keys = {'wave_direction', 'radiating_dof', 'influenced_dof', 'body_name', 'omega', 'period', 'wavelength', 'wavenumber', 'forward_speed', 'water_depth', 'rho', 'g', 'theta'} unrecognized_keys = keys_in_dataset.difference(accepted_keys) if len(unrecognized_keys) > 0: LOG.warning(f"Unrecognized key(s) in dataset: {unrecognized_keys}") if ("radiating_dof" not in keys_in_dataset) and ("wave_direction" not in keys_in_dataset): raise ValueError("Neither 'radiating_dof' nor 'wave_direction' has been provided in the dataset. " "No linear potential flow problem can be inferred.") frequency_keys = keys_in_dataset & {'omega', 'period', 'wavelength', 'wavenumber'} if len(frequency_keys) > 1: raise ValueError("Setting problems requires at most one of the following: omega (angular frequency) OR period OR wavenumber OR wavelength.\n" "Received {}".format(frequency_keys)) # END SANITY CHECKS if len(frequency_keys) == 0: freq_type = "omega" freq_range = [_default_parameters['omega']] else: # len(frequency_keys) == 1 freq_type = list(frequency_keys)[0] # Get the only item freq_range = dataset[freq_type].data water_depth_range = dataset['water_depth'].data if 'water_depth' in dataset else [_default_parameters['water_depth']] rho_range = dataset['rho'].data if 'rho' in dataset else [_default_parameters['rho']] g_range = dataset['g'].data if 'g' in dataset else [_default_parameters['g']] forward_speed_range = dataset['forward_speed'] if 'forward_speed' in dataset else [_default_parameters['forward_speed']] wave_direction_range = dataset['wave_direction'].data if 'wave_direction' in dataset else None radiating_dofs = dataset['radiating_dof'].data.astype(object) if 'radiating_dof' in dataset else None # astype(object) is meant to convert Numpy internal string type numpy.str_ to Python general string type. if 'body_name' in dataset: assert set(dataset['body_name'].data) <= {body.name for body in bodies}, \ "Some body named in the dataset was not given as argument to `problems_from_dataset`." body_range = {body.name: body for body in bodies if body.name in dataset['body_name'].data} # Only the bodies listed in the dataset have been kept else: body_range = {body.name: body for body in bodies} problems = [] if wave_direction_range is not None: for freq, wave_direction, water_depth, body_name, forward_speed, rho, g \ in product(freq_range, wave_direction_range, water_depth_range, body_range, forward_speed_range, rho_range, g_range): if freq not in {0.0, np.inf}: problems.append( DiffractionProblem(body=body_range[body_name], **{freq_type: freq}, wave_direction=wave_direction, water_depth=water_depth, forward_speed=forward_speed, rho=rho, g=g) ) elif freq in {0.0, np.inf} and radiating_dofs is not None: # Diffraction problems are not defined for 0 and infinite frequency. # But we don't want the whole batch to fail, as these frequencies are there for the radiation problems. # The excitation force will be NaN for these frequencies in the resulting dataset. pass else: raise ValueError("Zero and infinite frequencies are not defined when solving only diffraction problems.") if radiating_dofs is not None: for freq, radiating_dof, water_depth, body_name, forward_speed, rho, g \ in product(freq_range, radiating_dofs, water_depth_range, body_range, forward_speed_range, rho_range, g_range): if forward_speed == 0.0: problems.append( RadiationProblem(body=body_range[body_name], **{freq_type: freq}, radiating_dof=radiating_dof, water_depth=water_depth, forward_speed=forward_speed, rho=rho, g=g) ) else: if wave_direction_range is None: LOG.warning("Dataset contains non-zero forward speed (forward_speed=%.2f) but no wave_direction has been provided. Wave direction of 0 rad (x-axis) has been assumed.", forward_speed) wave_direction_range = [0.0] for wave_direction in wave_direction_range: problems.append( RadiationProblem(body=body_range[body_name], **{freq_type: freq}, radiating_dof=radiating_dof, water_depth=water_depth, forward_speed=forward_speed, wave_direction=wave_direction, rho=rho, g=g) ) return sorted(problems)
def _squeeze_dimensions(data_array, dimensions=None): """Remove dimensions if they are of size 1. The coordinates become scalar coordinates.""" if dimensions is None: dimensions = data_array.dims for dim in dimensions: if len(data_array[dim]) == 1: data_array = data_array.squeeze(dim, drop=False) return data_array def _unsqueeze_dimensions(data_array, dimensions=None): """Add scalar coordinates as dimensions of size 1.""" if dimensions is None: dimensions = list(data_array.coords.keys()) for dim in dimensions: if len(data_array.coords[dim].values.shape) == 0: data_array = xr.concat([data_array], dim=dim) return data_array ###################### # Dataset creation # ###################### def _dataset_from_dataframe(df: pd.DataFrame, variables: Union[str, Sequence[str]], dimensions: Sequence[str], optional_dims: Sequence[str], ) -> Union[xr.DataArray, xr.Dataset]: """Transform a pandas.Dataframe into a xarray.Dataset. Parameters ---------- df: pandas.DataFrame the input dataframe variables: string or sequence of strings the variables that will be stored in the output dataset. If a single name is provided, a DataArray of this variable will be provided instead. dimensions: sequence of strings Names of dimensions the variables depends on. They will always appear as dimension in the output dataset. optional_dims: sequence of strings Names of dimensions the variables depends on. They will appears as dimension in the output dataset only if they have more than one different values. """ for variable_name in variables: df = df[df[variable_name].notnull()].dropna(axis='columns') # Keep only records with non null values of all the variables df = df.drop_duplicates(optional_dims + dimensions) df = df.set_index(optional_dims + dimensions) da = df.to_xarray()[variables] da = _squeeze_dimensions(da, dimensions=optional_dims) return da
[docs] def hydrostatics_dataset(bodies: Sequence[FloatingBody]) -> xr.Dataset: """Create a dataset by looking for 'inertia_matrix' and 'hydrostatic_stiffness' for each of the bodies in the list passed as argument. """ dataset = xr.Dataset() for body_property in ['inertia_matrix', 'hydrostatic_stiffness']: bodies_properties = {body.name: body.__getattribute__(body_property) for body in bodies if hasattr(body, body_property)} if len(bodies_properties) > 0: bodies_properties = xr.concat(bodies_properties.values(), pd.Index(bodies_properties.keys(), name='body_name')) bodies_properties = _squeeze_dimensions(bodies_properties, dimensions=['body_name']) dataset = xr.merge([dataset, {body_property: bodies_properties}]) return dataset
[docs] def kochin_data_array(results: Sequence[LinearPotentialFlowResult], theta_range: Sequence[float], **kwargs, ) -> xr.Dataset: """Compute the Kochin function for a list of results and fills a dataset. .. seealso:: :meth:`~capytaine.post_pro.kochin.compute_kochin` The present function is just a wrapper around :code:`compute_kochin`. """ records = pd.DataFrame([ dict(**result.problem._asdict(), theta=theta, kochin=kochin, kind=result.__class__.__name__) for result in results for theta, kochin in zip(theta_range.data, compute_kochin(result, theta_range, **kwargs)) ]) kochin_data = xr.Dataset() if "RadiationResult" in set(records['kind']): radiation = _dataset_from_dataframe( records[records['kind'] == "RadiationResult"], variables=['kochin'], dimensions=['omega', 'radiating_dof', 'theta'], optional_dims=['g', 'rho', 'body_name', 'water_depth', 'forward_speed', 'wave_direction'] ) kochin_data['kochin'] = radiation['kochin'] if "DiffractionResult" in set(records['kind']): diffraction = _dataset_from_dataframe( records[records['kind'] == "DiffractionResult"], ['kochin'], dimensions=['omega', 'wave_direction', 'theta'], optional_dims=['g', 'rho', 'body_name', 'water_depth', 'forward_speed'] ) kochin_data['kochin_diffraction'] = diffraction['kochin'] return kochin_data
[docs] def collect_records(results): records_list = [] warned_once_about_no_free_surface = False for result in results: if result.free_surface == np.inf: if not warned_once_about_no_free_surface: LOG.warning("Datasets currently only support cases with a free surface (free_surface=0.0).\n" "Cases without a free surface (free_surface=inf) are ignored.\n" "See also https://github.com/mancellin/capytaine/issues/88") warned_once_about_no_free_surface = True else: pass else: for record in result.records: records_list.append(record) return records_list
[docs] def assemble_dataset(results, omega=True, wavenumber=True, wavelength=True, period=True, mesh=False, hydrostatics=True, attrs=None) -> xr.Dataset: """Transform a list of :class:`LinearPotentialFlowResult` into a :class:`xarray.Dataset`. .. todo:: The :code:`mesh` option to store information on the mesh could be improved. It could store the full mesh in the dataset to ensure the reproducibility of the results. Parameters ---------- results: list of LinearPotentialFlowResult The results that will be read. omega: bool, optional If True, the coordinate 'omega' will be added to the output dataset. wavenumber: bool, optional If True, the coordinate 'wavenumber' will be added to the output dataset. wavelength: bool, optional If True, the coordinate 'wavelength' will be added to the output dataset. period: bool, optional If True, the coordinate 'period' will be added to the output dataset. mesh: bool, optional If True, store some infos on the mesh in the output dataset. hydrostatics: bool, optional If True, store the hydrostatic data in the output dataset if they exist. attrs: dict, optional Attributes that should be added to the output dataset. """ dataset = xr.Dataset() error_msg = 'The first argument of `assemble_dataset` must be either a list of LinearPotentialFlowResult or a bemio.io object' if hasattr(results, '__iter__'): try: if 'capytaine' in results[0].__module__: bemio_import = False else: raise TypeError(error_msg) except: raise TypeError(error_msg) else: try: if 'bemio.io' in results.__module__: bemio_import = True else: raise TypeError(error_msg) except: raise TypeError(error_msg) if bemio_import: records = dataframe_from_bemio(results, wavenumber, wavelength) # TODO add hydrostatics all_dofs_in_order = {'Surge': None, 'Sway': None, 'Heave': None, 'Roll': None, 'Pitch': None, 'Yaw': None} main_freq_type = "omega" else: records = pd.DataFrame(collect_records(results)) all_dofs_in_order = {k: None for r in results for k in r.body.dofs.keys()} main_freq_type = Counter((res.provided_freq_type for res in results)).most_common(1)[0][0] if attrs is None: attrs = {} attrs['creation_of_dataset'] = datetime.now().isoformat() if len(records) == 0: raise ValueError("No result passed to assemble_dataset.") inf_dof_cat = pd.CategoricalDtype(categories=all_dofs_in_order.keys()) records["influenced_dof"] = records["influenced_dof"].astype(inf_dof_cat) rad_dof_cat = pd.CategoricalDtype(categories=all_dofs_in_order.keys()) if 'added_mass' in records.columns: records["radiating_dof"] = records["radiating_dof"].astype(rad_dof_cat) optional_dims = ['g', 'rho', 'body_name', 'water_depth', 'forward_speed'] # RADIATION RESULTS if 'added_mass' in records.columns: radiation_cases = _dataset_from_dataframe( records, variables=['added_mass', 'radiation_damping'], dimensions=[main_freq_type, 'radiating_dof', 'influenced_dof'], optional_dims=optional_dims + ['wave_direction']) radiation_cases.added_mass.attrs['long_name'] = 'Added mass' radiation_cases.radiation_damping.attrs['long_name'] = 'Radiation damping' radiation_cases.radiating_dof.attrs['long_name'] = 'Radiating DOF' radiation_cases.influenced_dof.attrs['long_name'] = 'Influenced DOF' dataset = xr.merge([dataset, radiation_cases]) # DIFFRACTION RESULTS if 'diffraction_force' in records.columns: diffraction_cases = _dataset_from_dataframe( records, variables=['diffraction_force', 'Froude_Krylov_force'], dimensions=[main_freq_type, 'wave_direction', 'influenced_dof'], optional_dims=optional_dims) diffraction_cases.diffraction_force.attrs['long_name'] = 'Diffraction force' diffraction_cases.Froude_Krylov_force.attrs['long_name'] = 'Froude Krylov force' diffraction_cases.influenced_dof.attrs['long_name'] = 'Influenced DOF' diffraction_cases.wave_direction.attrs['long_name'] = 'Wave direction' diffraction_cases.wave_direction.attrs['units'] = 'rad' dataset = xr.merge([dataset, diffraction_cases]) dataset['excitation_force'] = dataset['Froude_Krylov_force'] + dataset['diffraction_force'] # OTHER FREQUENCIES TYPES if omega and main_freq_type != "omega": omega_ds = _dataset_from_dataframe( records, variables=['omega'], dimensions=[main_freq_type], optional_dims=['g', 'water_depth'] if main_freq_type in {'wavelength', 'wavenumber'} else [] ) dataset.coords['omega'] = omega_ds['omega'] dataset.omega.attrs['long_name'] = 'Angular frequency' dataset.omega.attrs['units'] = 'rad/s' if period and main_freq_type != "period": period_ds = _dataset_from_dataframe( records, variables=['period'], dimensions=[main_freq_type], optional_dims=['g', 'water_depth'] if main_freq_type in {'wavelength', 'wavenumber'} else [] ) dataset.coords['period'] = period_ds['period'] dataset.period.attrs['long_name'] = 'Period' dataset.period.attrs['units'] = 's' if wavenumber and main_freq_type != "wavenumber": wavenumber_ds = _dataset_from_dataframe( records, variables=['wavenumber'], dimensions=[main_freq_type], optional_dims=['g', 'water_depth'] if main_freq_type in {'period', 'omega'} else [] ) dataset.coords['wavenumber'] = wavenumber_ds['wavenumber'] dataset.wavenumber.attrs['long_name'] = 'Angular wavenumber' dataset.wavenumber.attrs['units'] = 'rad/m' if wavelength and main_freq_type != "wavelength": wavelength_ds = _dataset_from_dataframe( records, variables=['wavelength'], dimensions=[main_freq_type], optional_dims=['g', 'water_depth'] if main_freq_type in {'period', 'omega'} else [] ) dataset.coords['wavelength'] = wavelength_ds['wavelength'] dataset.wavelength.attrs['long_name'] = 'Wave length' dataset.wavelength.attrs['units'] = 'm' if not all(records["forward_speed"] == 0.0): omegae_ds = _dataset_from_dataframe( records, variables=['encounter_omega'], dimensions=['forward_speed', 'wave_direction', main_freq_type], optional_dims=['g', 'water_depth'], ) dataset.coords['encounter_omega'] = omegae_ds['encounter_omega'] dataset.encounter_omega.attrs['long_name'] = 'Encounter angular frequency' dataset.encounter_omega.attrs['units'] = 'rad/s' encounter_wave_direction_ds = _dataset_from_dataframe( records, variables=['encounter_wave_direction'], dimensions=['forward_speed', 'wave_direction', main_freq_type], optional_dims=[], ) dataset.coords['encounter_wave_direction'] = encounter_wave_direction_ds['encounter_wave_direction'] dataset.encounter_wave_direction.attrs['long_name'] = 'Encounter wave direction' dataset.encounter_wave_direction.attrs['units'] = 'rad' if mesh: if bemio_import: LOG.warning('Bemio data does not include mesh data. mesh=True is ignored.') else: # TODO: Store full mesh... bodies = list({result.body for result in results}) # Filter out duplicate bodies in the list of results nb_faces = {body.name: body.mesh.nb_faces for body in bodies} def name_or_str(c): return c.name if hasattr(c, 'name') else str(c) quad_methods = {body.name: name_or_str(body.mesh.quadrature_method) for body in bodies} if len(nb_faces) > 1: dataset.coords['nb_faces'] = ('body_name', [nb_faces[name] for name in dataset.coords['body_name'].data]) dataset.coords['quadrature_method'] = ('body_name', [quad_methods[name] for name in dataset.coords['body_name'].data]) else: def the_only(d): """Return the only element of a 1-element dictionary""" return next(iter(d.values())) dataset.coords['nb_faces'] = the_only(nb_faces) dataset.coords['quadrature_method'] = the_only(quad_methods) # HYDROSTATICS if hydrostatics: if bemio_import: LOG.warning('Bemio data import being used, hydrostatics=True is ignored.') else: bodies = list({result.body for result in results}) dataset = xr.merge([dataset, hydrostatics_dataset(bodies)]) dataset.attrs.update(attrs) dataset.attrs['capytaine_version'] = __version__ return dataset
################################ # Handling of complex values # ################################
[docs] def separate_complex_values(ds: xr.Dataset) -> xr.Dataset: """Return a new Dataset where complex-valued arrays of shape (...) have been replaced by real-valued arrays of shape (2, ...). .. seealso:: :func:`merge_complex_values` The invert operation """ ds = ds.copy() for variable in ds.data_vars: if ds[variable].dtype == complex: da = ds[variable] new_da = xr.DataArray(np.asarray((np.real(da).data, np.imag(da).data)), dims=('complex',) + da.dims) ds[variable] = new_da ds.coords['complex'] = ['re', 'im'] return ds
[docs] def merge_complex_values(ds: xr.Dataset) -> xr.Dataset: """Return a new Dataset where real-valued arrays of shape (2, ...) have been replaced by complex-valued arrays of shape (...). .. seealso:: :func:`separate_complex_values` The invert operation """ if 'complex' in ds.coords: ds = ds.copy() for variable in ds.data_vars: if 'complex' in ds[variable].coords: da = ds[variable] new_dims = [d for d in da.dims if d != 'complex'] new_da = xr.DataArray(da.sel(complex='re').data + 1j*da.sel(complex='im').data, dims=new_dims) ds[variable] = new_da ds = ds.drop_vars('complex') return ds