Source code for pythia.chaos

"""
File: pythia/chaos.py
Author: Nando Hegemann
Gitlab: https://gitlab.com/Nando-Hegemann
Description: Sample-based computation of polynomial chaos expansion.
SPDX-License-Identifier: LGPL-3.0-or-later OR Hippocratic-3.0-ECO-MEDIA-MIL
"""
import warnings
import math
import psutil
import numpy as np
import pythia as pt


[docs]class PolynomialChaos: """Computation of sparse polynomial chaos expansion. Parameters ---------- params : list of `pt.parameter.Parameter` List of stochastic parameters. index_set : pt.index.IndexSet Index set for sparse polynomial chaos expansion. x_train : array_like Parameter realizations for training. weights : array_like Regression weights for training. fEval : array_like Function evaluation for training. coefficients : array_like, default=None Polynomial expansion coefficients. If given, the coefficients are not computed during initiation. This can be used to load a chaos expansion. """ def __init__( self, params: list[pt.parameter.Parameter], index_set: pt.index.IndexSet, x_train: np.ndarray, w_train: np.ndarray, y_train: np.ndarray, coefficients: np.ndarray | None = None, ) -> None: """Initiate the computation of the PC expansion of a function.""" assert x_train.ndim == 2 and x_train.shape[1] == len(params) assert w_train.ndim == 1 and w_train.size == x_train.shape[0] assert y_train.ndim == 2 and y_train.shape[0] == x_train.shape[0] self.parameters = params self.index_set = index_set self.x_train = x_train self.w_train = w_train self.y_train = y_train self.n_samples, self.sdim = x_train.shape self.ydim = y_train.shape[1] self.univariate_pdfs = [ pt.sampler.assign_sampler(param) for param in self.parameters ] self.pdf = pt.sampler.ParameterSampler(self.parameters).pdf self.univariate_bases = pt.basis.univariate_basis( self.parameters, self.index_set.max ) self.basis = pt.basis.multivariate_basis( self.univariate_bases, self.index_set.indices ) self.gramian, self.basis_eval_mat = self._assemble_matrices() if coefficients is None: self.coefficients = self._fit() else: assert coefficients.shape == (index_set.shape[0], self.ydim) self.coefficients = coefficients self.sobol = self._compute_sobol_indices() @property def mean(self) -> np.ndarray: """Mean of the PC expansion.""" idx = self.index_set.get_index_number(np.zeros((1, self.sdim), dtype=int)) return self.coefficients[idx].flatten() @property def var(self) -> np.ndarray: """Variance of the PC expansion.""" if self.index_set.shape[0] == 1 and np.all(self.index_set.indices == 0): return np.zeros(self.ydim) return np.sum(self.coefficients**2, axis=0).flatten() - self.mean**2 @property def std(self) -> np.ndarray: """Standard deviation of the PC expansion.""" return np.sqrt(self.var) def _assemble_matrices(self) -> tuple[np.ndarray, np.ndarray]: """Assemble Gramian and basis evaluation matrix. Assemble the information matrix :math:`A` and the basis evaluation matrix :math:`\\Psi` with the regression points of the PC expansion. The basis evaluation matrix :math:`\\Psi` is given by .. math:: \\Psi_{kj} = \\operatorname{basis}[j](\\operatorname{regPoints}[k]). Returns ------- gramian : np.ndarray Empirical Gramian matrix. psi_mat : np.ndarray Basis evaluation matrix :math:`\\Psi`. """ batch_size = get_gram_batchsize(len(self.basis)) psi_mat = np.array([p(self.x_train) for p in self.basis]) gramian = np.zeros((len(self.basis), len(self.basis))) for batch in pt.misc.batch(range(self.x_train.shape[0]), batch_size): mat_1 = psi_mat[:, batch].reshape(1, len(self.basis), len(batch)) mat_2 = psi_mat[:, batch].reshape(len(self.basis), 1, len(batch)) gramian += np.sum(self.w_train[batch] * mat_1 * mat_2, axis=-1) return gramian, psi_mat.T def _fit(self) -> np.ndarray: """Compute polynomial chaos expansion coefficients. Compute the PC coefficients with linear regression. The coefficients are given by .. math:: S = A^(-1) * \\Psi^T * W * F_\\mathrm{ex} where the Gram matrix :math:`A` is of full rank but may be ill conditioned. :math:`F_\\mathrm{ex}` is an array containing the values of f evaluated at the required regression points. For more detail on the Gram matrix or :math:`\\Psi`, see `assemble_matrices()`. Returns ------- : Polynomial chaos expansion coefficients. """ u, s, vh = np.linalg.svd(self.gramian) gramian_inv = np.dot(vh.T, np.dot(np.diag(1 / s), u.T)) W = self.w_train.reshape(-1, 1) coefficients = np.linalg.multi_dot( [gramian_inv, self.basis_eval_mat.T, W * self.y_train] ) return coefficients def _compute_sobol_indices(self) -> np.ndarray: """Compute Sobol indices. The Sobol coefficients are given as .. math:: S_{i_1,...,i_k} = \\sum_{\\alpha\\in\\mathcal{M}} f_\\alpha(x)^2 where :math:`\\mathcal{M} = { \\alpha | \\alpha_{i_j} != 0 for j = 1,...,k }`. """ sobol = np.zeros([len(self.index_set.sobol_tuples), self.ydim]) # mask components of f with zero variance nz_idx = np.nonzero(self.var) for idx, sdx in enumerate(self.index_set.sobol_tuples): _mdx = self.index_set.sobol_tuple_to_indices([sdx])[0] rows = self.index_set.get_index_number(_mdx) coeffs = self.coefficients[rows] sobol[idx, nz_idx] = ( np.sum(coeffs[:, nz_idx] ** 2, axis=0) / self.var[nz_idx] ) return sobol
[docs] def eval( self, x: np.ndarray, partial: list[int] | dict | None = None ) -> np.ndarray: """Evaluate the (partial derivative of the) PC approximation. Parameters ---------- x : np.ndarray Parameter realizations in which the approximation is evaluated. partial : list[int] | dict | None, optional Number of derivatives for each parameter component. If a list is given, length has to be the number of parameters. Ordering of list is according to ``self.parameters``. If a dict is given, keys have to be subset of parameter names. Returns ------- : Evaluation of polynomial expansion in x values. Examples -------- Given two parameters :math:`x_1` and :math:`x_2` >>> param1 = pt.parameter.Parameter("x1", [-1, 1], "uniform") >>> param2 = pt.parameter.Parameter("x2", [-1, 1], "uniform") a corresponding polynomial chaos approximation for a function :math:`f\\colon (x_1,x_1) \\mapsto y` >>> surrogate = pt.chaos.PolynomialChaos([param1, param2], ...) and an array the surrogate should be evaluated in >>> x_test = np.random.uniform(-1, 1, (1000, 2)) we can evaluate the surrogate with >>> y_approx = surrogate.eval(x_test) To obtain partial a partial derivative of the approximation, e.g., :math:`\\frac{\\partial^2f}{\\partial x_2^2}`, specify a list >>> y_approx = surrogate.eval(x_test, partial=[0, 2]) or a dictionary with parameter names and number of partial derivates >>> y_approx = surrogate.eval(x_test, partial={'x2':2}) """ if x.ndim < 1 or x.ndim > 2: raise ValueError(f"Wrong ndim: '{x.ndim}'") if x.ndim == 1: x.shape = 1, -1 # TODO: need to do this: x = x.reshape(1, -1)? c = self.coefficients.reshape(self.coefficients.shape[0], 1, -1) if partial is None: basis = self.basis else: if isinstance(partial, dict): _param_names = [p.name for p in self.parameters] # check that all keys in partial are parameter names assert set(partial.keys()).issubset(set(_param_names)) _partial = [0] * len(_param_names) for k, v in partial.items(): _partial[_param_names.index(k)] = v else: # partial already is a list of int _partial = partial basis = pt.basis.multivariate_basis( self.univariate_bases, self.index_set.indices, _partial ) eval_mat = np.array([b(x) for b in basis]) eval_mat.shape = eval_mat.shape[0], eval_mat.shape[1], 1 return np.sum(c * eval_mat, axis=0)
[docs]def find_optimal_indices( params: list[pt.parameter.Parameter], x_train: np.ndarray, w_train: np.ndarray, y_train: np.ndarray, max_terms: int = 0, threshold: float = 1e-03, ) -> tuple[np.ndarray, np.ndarray]: """Compute optimal multiindices of polynomial chaos expansion. Heuristical approach to compute almost optimal multiindices for a polynomial chaos expansion based on an estimate of the Sobol index values. Parameters ---------- params : list of pythia.Parameters.Parameter Random parameters of the problem. x_train : array_like Sample points for training w_train : array_like Weights for training. y_train : array_like Function evaluations for training. max_terms : int, default=0 Maximum number of expansion terms. Number of expansion terms is chosen automatically for `max_terms=0`. threshold : float, default=1e-03 Truncation threshold for Sobol indices. Smallest Sobol values with sum less then ``threshold`` are ignored. Returns ------- indices : Array with multiindices. sobol : Crude intermediate approximation of Sobol indices. Notes ----- To find reasonable candidates for the sparse polynomial chaos expansion, first an expansion with a large simplex index set is computed. The simplex index set uses the same maximum dimension in each component and is designed to have at least ``max_terms`` many elements. With this index set a polynomial chaos expansion is computed. The computed Sobol indices are then ordered and the largest contributions are collected by a Dörfler marking strategy. Then a new index set is assembled by including a downward closed subset of polynomial chaos coefficient indices for each selected Sobol index tuple. The number of chosen indices for each selected Sobol index tuple is weighted by the respective Sobol index value. """ assert 0 <= threshold < 1 # set maximal number of expansion terms n_samples, dim = x_train.shape if max_terms > 0: _max_terms = max_terms else: _max_terms = int(n_samples / np.log(n_samples) / 2) if _max_terms > int(n_samples / np.log(n_samples) / 2): warnings.warn("Gramian may become ill conditioned.") # compute crude approximation of Sobol coefficients deg = 1 for _deg in range(2, 1000): n_terms = ( math.factorial(_deg + dim) / math.factorial(_deg) / math.factorial(dim) ) if n_terms > _max_terms: break deg = _deg _indices = pt.index.simplex_set(dim, deg) index_set = pt.index.IndexSet(_indices) surrogate = PolynomialChaos(params, index_set, x_train, w_train, y_train) # sort Sobol coefficients by largest and mark top threshold percent. idx, _, marker = pt.misc.doerfler_marking( np.sum(surrogate.sobol, axis=1), threshold=1 - threshold ) # assemble adaptive choice of multiindices indices = assemble_indices(idx[:marker], index_set.sobol_tuples, _max_terms) return indices, surrogate.sobol
[docs]def assemble_indices( enum_idx: list[int] | tuple[int] | np.ndarray, sobol_tuples: list[tuple], max_terms: int, ) -> np.ndarray: """Compute automatic choice of multiindices. Parameters ---------- enum_idx : np.ndarray Sorted enumeration indices according to magnitude of Sobol indices. sobol_tuples : list of tuple List of Sobol subscript tuples. max_terms : int Maximum number of expansion terms. Returns ------- indices : np.ndarray Array of (sparse) optimal multiindices. """ dim = max(len(sdx) for sdx in sobol_tuples) n_terms_per_idx = int((max_terms - 1) / len(enum_idx)) indices_list = [] for idx in enum_idx: components = [s - 1 for s in sobol_tuples[idx]] # current sdx deg = int(n_terms_per_idx ** (1 / len(components)) + 1) tmp_indices = pt.index.tensor_set( [deg + 2 if j in components else 1 for j in range(dim)], [1 if j in components else 0 for j in range(dim)], ) indices_list += [tmp_indices[:n_terms_per_idx]] indices = pt.index.sort_index_array( np.concatenate([np.zeros((1, dim))] + indices_list, axis=0) ) return indices
[docs]def get_gram_batchsize(dim: int, save_memory: float = 1025**3 / 2) -> int: """Compute memory allocation batch sizes for information matrix. Compute the maximal number of samples in each batch when assembling the information matrix to be maximally memory efficient and avoid OutOfMemory errors. Parameters ---------- dim : int Number of rows/columns of information matrix. save_memory : int, default=3*1025/2 Memory (in bytes), that should be kept free. The default is equivalent to 512 MB. Returns ------- : Batchsize for assembling of information matrix. """ available_memory = psutil.virtual_memory().available mem = available_memory - save_memory n = int(mem / 8 / dim**2) if n < 1: # There is less memory available than required for at least one sample. raise MemoryError("Not enough free memory.") else: return n
if __name__ == "__main__": pass