"""This module implements block matrices to be used in Hierarchical Toeplitz matrices.

It takes inspiration from the following works:

* `openHmx module from Gypsilab by Matthieu Aussal (GPL licensed) <>`_
* `HierarchicalMatrices by Markus Neumann (GPL licensed) <>`_
import logging

from numbers import Number
from typing import Tuple, List, Callable, Union, Iterable
from itertools import cycle, accumulate, chain, product
from import Iterator

import numpy as np

from capytaine.matrices.low_rank import LowRankMatrix
from import import_optional_dependency

LOG = logging.getLogger(__name__)

[docs]class BlockMatrix: """A (2D) matrix, stored as a set of submatrices (or blocks). Parameters ---------- blocks: list of list of matrices The blocks of the block matrix. check: bool, optional Should the code perform sanity checks on the inputs? (default: True) Attributes ---------- shape: pair of ints shape of the full matrix nb_blocks: pair of ints number of blocks in each directions. Example:: AAAABB AAAABB -> nb_blocks = (1, 2) AAAABB """ ndim = 2 # Other dimensions have not been implemented. def __init__(self, blocks, _stored_block_shapes=None, check=True): self._stored_nb_blocks = (len(blocks), len(blocks[0])) self._stored_blocks = np.empty(self._stored_nb_blocks, dtype=object) for i in range(len(blocks)): for j in range(len(blocks[i])): self._stored_blocks[i, j] = blocks[i][j] if _stored_block_shapes is None: self._stored_block_shapes = ([block.shape[0] for block in self._stored_blocks[:, 0]], [block.shape[1] for block in self._stored_blocks[0, :]]) else: # To avoid going down the tree if it is already known. self._stored_block_shapes = _stored_block_shapes # Block shape of the full matrix self.nb_blocks = self._compute_nb_blocks() # Total shape of the full matrix self.shape = self._compute_shape() LOG.debug(f"New block matrix: %s", self) if check: assert self._check_dimensions_of_blocks() assert self._check_dtype() def _compute_shape(self): # In a dedicated routine because it will be overloaded by subclasses. return sum(self._stored_block_shapes[0]), sum(self._stored_block_shapes[1]) def _compute_nb_blocks(self): return self._stored_nb_blocks def _check_dimensions_of_blocks(self) -> bool: """Check that the dimensions of the blocks are consistent.""" if not all(block.ndim == self.ndim for line in self._stored_blocks for block in line): return False for line in self.all_blocks: block_height = line[0].shape[0] for block in line[1:]: if not block.shape[0] == block_height: # Same height on a given line return False for col in np.moveaxis(self.all_blocks, 1, 0): block_width = col[0].shape[1] for block in col[1:]: if not block.shape[1] == block_width: # Same width on a given column return False return True def _check_dtype(self) -> bool: """Check that the type of the blocks are consistent.""" for line in self._stored_blocks: for block in line: if block.dtype != self.dtype: return False return True # ACCESSING DATA @property def dtype(self): """The type of data of all of the subblocks.""" try: return self._stored_blocks[0][0].dtype except AttributeError: return None @property def all_blocks(self) -> np.ndarray: """The matrix of matrices. For a full block matrix, all the blocks are stored in memory.""" return self._stored_blocks @property def block_shapes(self) -> Tuple[List[int], List[int]]: """The shapes of the blocks composing the BlockMatrix. Example:: AAAABB AAAABB -> block_shapes = ([3], [4, 2]) AAAABB """ return self._stored_block_shapes def _stored_block_positions(self, global_frame=(0, 0)) -> Iterable[List[Tuple[int, int]]]: """The position of each blocks in the matrix as a generator. The list is used by subclasses where the same block may appear several times in different positions. Parameters ---------- global_frame: Tuple[int], optional the coordinate of the top right corner. Default: 0, 0. Example:: AAAABB AAAABB -> list(matrix._stored_block_positions) = [[(0,0)], [(0, 4)], [(2, 0)], [(2, 4)]] CCCCDD """ x_acc = accumulate([0] + self.block_shapes[0][:-1]) y_acc = accumulate([0] + self.block_shapes[1][:-1]) return ([(global_frame[0] + x, global_frame[1] + y)] for x, y in product(x_acc, y_acc)) def _put_in_full_matrix(self, full_matrix: np.ndarray, where=(0, 0)) -> None: """Copy the content of the block matrix in a matrix, which is modified in place.""" all_blocks_in_flat_iterator = (block for line in self._stored_blocks for block in line) positions_of_all_blocks = self._stored_block_positions(global_frame=where) for block, positions_of_the_block in zip(all_blocks_in_flat_iterator, positions_of_all_blocks): if isinstance(block, BlockMatrix): position_of_first_appearance = positions_of_the_block[0] frame_of_first_appearance = (slice(position_of_first_appearance[0], position_of_first_appearance[0]+block.shape[0]), slice(position_of_first_appearance[1], position_of_first_appearance[1]+block.shape[1])) block._put_in_full_matrix(full_matrix, where=position_of_first_appearance) for position in positions_of_the_block[1:]: # For the other appearances, only copy the first appearance block_frame = (slice(position[0], position[0]+block.shape[0]), slice(position[1], position[1]+block.shape[1])) full_matrix[block_frame] = full_matrix[frame_of_first_appearance] else: full_block = block if isinstance(block, np.ndarray) else block.full_matrix() for position in positions_of_the_block: block_frame = (slice(position[0], position[0]+block.shape[0]), slice(position[1], position[1]+block.shape[1])) full_matrix[block_frame] = full_block
[docs] def full_matrix(self, dtype=None) -> np.ndarray: """Flatten the block structure and return a full matrix.""" if dtype is None: dtype = self.dtype full_matrix = np.empty(self.shape, dtype=dtype) self._put_in_full_matrix(full_matrix) return full_matrix
def __array__(self, dtype=None): return self.full_matrix(dtype=dtype)
[docs] def no_toeplitz(self): """Recursively replace the block toeplitz matrices by usual block matrices. WARNING: the block matrices may still contain several references to the same block.""" blocks = [[block.no_toeplitz() if isinstance(block, BlockMatrix) else block for block in line] for line in self.all_blocks] return BlockMatrix(blocks)
def __deepcopy__(self, memo): from copy import deepcopy blocks = [[deepcopy(block) for block in line] for line in self._stored_blocks] # The call to deepcopy does not use the memo on purpose: # the goal is to replace references to the same block by references to different copies of the block. return self.__class__(blocks) @property def stored_data_size(self): """Return the number of entries actually stored in memory.""" size = 0 for line in self._stored_blocks: for block in line: if isinstance(block, np.ndarray): size += np.product(block.shape) else: size += block.stored_data_size return size @property def density(self): return self.stored_data_size/np.product(self.shape) @property def sparcity(self): return 1 - self.density def __hash__(self): # Temporary return id(self) # TRANSFORMING DATA def _apply_unary_op(self, op: Callable) -> 'BlockMatrix': """Helper function applying a function recursively on all submatrices.""" LOG.debug(f"Apply op {op.__name__} to {self}") result = [[op(block) for block in line] for line in self._stored_blocks] return self.__class__(result, _stored_block_shapes=self._stored_block_shapes, check=False) def _apply_binary_op(self, op: Callable, other: 'BlockMatrix') -> 'BlockMatrix': """Helper function applying a binary operator recursively on all submatrices.""" if isinstance(other, self.__class__) and self.nb_blocks == other.nb_blocks: LOG.debug(f"Apply op {op.__name__} to {self} and {other}") result = [ [op(block, other_block) for block, other_block in zip(line, other_line)] for line, other_line in zip(self._stored_blocks, other._stored_blocks) ] return self.__class__(result, _stored_block_shapes=self._stored_block_shapes, check=False) else: return NotImplemented def __add__(self, other: 'BlockMatrix') -> 'BlockMatrix': from operator import add return self._apply_binary_op(add, other) def __radd__(self, other: 'BlockMatrix') -> 'BlockMatrix': return self + other def __neg__(self) -> 'BlockMatrix': from operator import neg return self._apply_unary_op(neg) def __sub__(self, other: 'BlockMatrix') -> 'BlockMatrix': from operator import sub return self._apply_binary_op(sub, other) def __rsub__(self, other: 'BlockMatrix') -> 'BlockMatrix': from operator import sub return other._apply_binary_op(sub, self) def __mul__(self, other: Union['BlockMatrix', Number]) -> 'BlockMatrix': if isinstance(other, Number): return self._apply_unary_op(lambda x: other*x) else: from operator import mul return self._apply_binary_op(mul, other) def __rmul__(self, other: Union['BlockMatrix', Number]) -> 'BlockMatrix': return self * other def __truediv__(self, other: Union['BlockMatrix', Number]) -> 'BlockMatrix': from numbers import Number if isinstance(other, Number): return self._apply_unary_op(lambda x: x/other) else: from operator import truediv return self._apply_binary_op(truediv, other) def __rtruediv__(self, other: Union['BlockMatrix', Number]) -> 'BlockMatrix': from numbers import Number if isinstance(other, Number): return self._apply_unary_op(lambda x: other/x) else: return self._apply_binary_op(lambda x, y: y/x, other)
[docs] def matvec(self, other): """Matrix vector product. Named as such to be used as scipy LinearOperator.""" LOG.debug(f"Multiplication of {self} with a full vector of size {other.shape}.") result = np.zeros(self.shape[0], dtype=other.dtype) line_heights = self.block_shapes[0] line_positions = list(accumulate(chain([0], line_heights))) col_widths = self.block_shapes[1] col_positions = list(accumulate(chain([0], col_widths))) for line, line_position, line_height in zip(self.all_blocks, line_positions, line_heights): line_slice = slice(line_position, line_position+line_height) for block, col_position, col_width in zip(line, col_positions, col_widths): col_slice = slice(col_position, col_position+col_width) result[line_slice] += block @ other[col_slice] return result
[docs] def rmatvec(self, other): """Vector matrix product. Named as such to be used as scipy LinearOperator.""" LOG.debug(f"Multiplication of a full vector of size {other.shape} with {self}.") result = np.zeros(self.shape[1], dtype=other.dtype) line_heights = self.block_shapes[0] line_positions = list(accumulate(chain([0], line_heights))) col_widths = self.block_shapes[1] col_positions = list(accumulate(chain([0], col_widths))) for col, col_position, col_width in zip(self.all_blocks.T, col_positions, col_widths): col_slice = slice(col_position, col_position+col_width) for block, line_position, line_height in zip(col, line_positions, line_heights): line_slice = slice(line_position, line_position+line_height) if isinstance(block, BlockMatrix): result[col_slice] += block.rmatvec(other[line_slice]) else: result[col_slice] += other[line_slice] @ block return result
[docs] def matmat(self, other): """Matrix-matrix product.""" if isinstance(other, BlockMatrix) and self.block_shapes[1] == other.block_shapes[0]: LOG.debug(f"Multiplication of %s with %s", self, other) own_blocks = self.all_blocks other_blocks = np.moveaxis(other.all_blocks, 1, 0) new_matrix = [] for own_line in own_blocks: new_line = [] for other_col in other_blocks: new_line.append(sum(own_block @ other_block for own_block, other_block in zip(own_line, other_col))) new_matrix.append(new_line) return BlockMatrix(new_matrix, check=False) elif isinstance(other, np.ndarray) and self.shape[1] == other.shape[0]: LOG.debug(f"Multiplication of {self} with a full matrix of shape {other.shape}.") # Cut the matrix and recursively call itself to use the code above. from import cut_matrix cut_other = cut_matrix(other, self.block_shapes[1], [other.shape[1]], check=False) return (self @ cut_other).full_matrix()
def __matmul__(self, other: Union['BlockMatrix', np.ndarray]) -> Union['BlockMatrix', np.ndarray]: if not (isinstance(other, BlockMatrix) or isinstance(other, np.ndarray)): return NotImplemented elif other.ndim == 2: # Other is a matrix if other.shape[1] == 1: # Actually a column vector return self.matvec(other.flatten()) else: return self.matmat(other) elif other.ndim == 1: # Other is a vector return self.matvec(other) else: return NotImplemented
[docs] def astype(self, dtype: np.dtype) -> 'BlockMatrix': return self._apply_unary_op(lambda x: x.astype(dtype))
[docs] def fft_of_list(*block_matrices, check=True): """Compute the fft of a list of block matrices of the same type and shape. The output is a list of block matrices of the same shape as the input ones. The fft is computed element-wise, so the block structure does not cause any mathematical difficulty. Returns an array of BlockMatrices. """ class_of_matrices = type(block_matrices[0]) nb_blocks = block_matrices[0]._stored_nb_blocks LOG.debug(f"FFT of {len(block_matrices)} {class_of_matrices.__name__} (stored blocks = {nb_blocks})") if check: # Check the validity of the shapes of the matrices given as input shape = block_matrices[0].shape assert [nb_blocks == matrix._stored_nb_blocks for matrix in block_matrices[1:]] assert [shape == matrix.shape for matrix in block_matrices[1:]] assert [class_of_matrices == type(matrix) for matrix in block_matrices[1:]] # Initialize a vector of block matrices without values in the blocks. result = np.empty(len(block_matrices), dtype=object) for i in range(len(block_matrices)): result[i] = class_of_matrices(np.empty(nb_blocks, dtype=object), _stored_block_shapes=block_matrices[0]._stored_block_shapes, check=False) for i_block, j_block in product(range(nb_blocks[0]), range(nb_blocks[1])): list_of_i_j_blocks = [block_matrices[i_matrix]._stored_blocks[i_block, j_block] for i_matrix in range(len(block_matrices))] if any(isinstance(block, np.ndarray) or isinstance(block, LowRankMatrix) for block in list_of_i_j_blocks): list_of_i_j_blocks = [block if isinstance(block, np.ndarray) else block.full_matrix() for block in list_of_i_j_blocks] fft_of_blocks = np.fft.fft(list_of_i_j_blocks, axis=0) else: fft_of_blocks = BlockMatrix.fft_of_list(*list_of_i_j_blocks, check=False) for matrix, computed_block in zip(result, fft_of_blocks): matrix._stored_blocks[i_block, j_block] = computed_block return result
# COMPARISON AND REDUCTION def __eq__(self, other: 'BlockMatrix') -> 'BlockMatrix[bool]': from operator import eq return self._apply_binary_op(eq, other) def __invert__(self) -> 'BlockMatrix': """Boolean not (~)""" from operator import invert return self._apply_unary_op(invert) def __ne__(self, other: 'BlockMatrix') -> 'BlockMatrix[bool]': return ~(self == other)
[docs] def all(self) -> bool: for line in self._stored_blocks: for block in line: if not block.all(): return False return True
[docs] def any(self) -> bool: for line in self._stored_blocks: for block in line: if block.any(): return True return False
[docs] def min(self) -> Number: return min(block.min() for line in self._stored_blocks for block in line)
[docs] def max(self) -> Number: return max(block.max() for line in self._stored_blocks for block in line)
# DISPLAYING DATA @property def str_shape(self): blocks_str = [] for line in self.all_blocks: for block in line: if isinstance(block, BlockMatrix): blocks_str.append(block.str_shape) elif isinstance(block, np.ndarray) or isinstance(block, LowRankMatrix): blocks_str.append("{}×{}".format(*block.shape)) else: blocks_str.append("?×?") if len(set(blocks_str)) == 1: return "{}×{}×[".format(*self.nb_blocks) + blocks_str[0] + "]" else: blocks_str = np.array(blocks_str).reshape(self.nb_blocks).tolist() return str(blocks_str).replace("'", "") def __str__(self): if not hasattr(self, '_str'): args = [self.str_shape] if self.dtype not in [np.float64, float]: args.append(f"dtype={self.dtype}") self._str = f"{self.__class__.__name__}(" + ", ".join(args) + ")" return self._str display_color = cycle([f'C{i}' for i in range(10)]) def _patches(self, global_frame: Union[Tuple[int, int], np.ndarray] ): """Helper function for displaying the shape of the matrix. Recursively returns a list of rectangles representing the sub-blocks of the matrix. Uses BlockMatrix.display_color to assign color to the blocks. By default, it cycles through matplotlib default colors. But if display_color is redefined as a callable, it is called with the block as argument. Parameters ---------- global_frame: tuple of ints coordinates of the origin in the top left corner. Returns ------- list of matplotlib.patches.Rectangle """ matplotlib_patches = import_optional_dependency("matplotlib.patches", "matplotlib") Rectangle = matplotlib_patches.Rectangle all_blocks_in_flat_iterator = (block for line in self._stored_blocks for block in line) positions_of_all_blocks = self._stored_block_positions(global_frame=global_frame) patches = [] for block, positions_of_the_block in zip(all_blocks_in_flat_iterator, positions_of_all_blocks): position_of_first_appearance = positions_of_the_block[0] # Exchange coordinates: row index i -> y, column index j -> x position_of_first_appearance = np.array((position_of_first_appearance[1], position_of_first_appearance[0])) if isinstance(block, BlockMatrix): patches_of_this_block = block._patches(np.array((position_of_first_appearance[1], position_of_first_appearance[0]))) elif isinstance(block, np.ndarray): if isinstance(self.display_color, Iterator): color = next(self.display_color) elif callable(self.display_color): color = self.display_color(block) else: color = np.random.rand(3) patches_of_this_block = [Rectangle(position_of_first_appearance, block.shape[1], block.shape[0], edgecolor='k', facecolor=color)] elif isinstance(block, LowRankMatrix): if isinstance(self.display_color, Iterator): color = next(self.display_color) elif callable(self.display_color): color = self.display_color(block) else: color = np.random.rand(3) patches_of_this_block = [ # Left block Rectangle(position_of_first_appearance, block.left_matrix.shape[1], block.left_matrix.shape[0], edgecolor='k', facecolor=color), # Top block Rectangle(position_of_first_appearance, block.right_matrix.shape[1], block.right_matrix.shape[0], edgecolor='k', facecolor=color), # Rest of the matrix Rectangle(position_of_first_appearance, block.right_matrix.shape[1], block.left_matrix.shape[0], facecolor=color, alpha=0.2), ] else: raise NotImplementedError() patches.extend(patches_of_this_block) # For the other appearances, copy the patches of the first appearance for block_position in positions_of_the_block[1:]: block_position = np.array((block_position[1], block_position[0])) for patch in patches_of_this_block: # A block can be made of several patches. shift = block_position - position_of_first_appearance patch_position = np.array(patch.get_xy()) + shift patches.append(Rectangle(patch_position, patch.get_width(), patch.get_height(), facecolor=patch.get_facecolor(), alpha=0.2)) return patches
[docs] def plot_shape(self): """Plot the structure of the matrix using matplotlib.""" matplotlib = import_optional_dependency("matplotlib") plt = matplotlib.pyplot plt.figure() for patch in self._patches((0, 0)): plt.gca().add_patch(patch) plt.axis('equal') plt.xlim(0, self.shape[1]) plt.ylim(0, self.shape[0]) plt.gca().invert_yaxis()