"""
File: pythia/sampler.py
Author: Nando Hegemann
Gitlab: https://gitlab.com/Nando-Hegemann
Description: Sampler classes for generating in random samples and PDF evaluations.
SPDX-License-Identifier: LGPL-3.0-or-later OR Hippocratic-3.0-ECO-MEDIA-MIL
"""
from typing import Callable
from abc import ABC, abstractmethod, abstractproperty
import warnings
import numpy as np
import scipy.stats
from scipy.integrate import quad
from scipy.special import gamma
import pythia as pt
[docs]class Sampler(ABC):
"""Base class for all continuous samplers."""
# set abstract attributes
domain: np.ndarray
@abstractproperty
def dimension(self) -> int:
"""Dimension of the ambient space."""
raise NotImplementedError
@abstractproperty
def mass(self):
"""Mass of the sampler distribution.
The integral of the sampler distribution over the domain of
definition. If the density is normalised this value should be one.
"""
raise NotImplementedError
@abstractproperty
def maximum(self) -> float:
"""Maximum of the pdf."""
raise NotImplementedError
@abstractproperty
def mean(self) -> float | np.ndarray:
"""Mean value of the pdf."""
raise NotImplementedError
@abstractproperty
def cov(self) -> float | np.ndarray:
"""(Co)Variance of the pdf."""
raise NotImplementedError
[docs] @abstractmethod
def pdf(self, x: np.ndarray) -> np.ndarray:
"""Density of the samplers distribution.
Computes the density of the samplers underlying distribution at the
given points `x`.
Parameters
----------
x : array_like of shape (..., D)
list of points or single point. `D` is the objects dimension.
Returns
-------
:
Density values at the points.
"""
raise NotImplementedError
[docs] @abstractmethod
def log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Log-density of the samplers distribution.
Computes the log-density of the samplers underlying distribution at the
given points `x`.
Parameters
----------
x : array_like of shape (..., D)
list of points or single point. `D` is the objects dimension.
Returns
-------
:
Log-density values at the points.
"""
raise NotImplementedError
[docs] @abstractmethod
def grad_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Gradient of log-density of the samplers distribution.
Computes the gradient of the log-density of the samplers underlying
distribution at the given points `x`.
Parameters
----------
x : array_like of shape (..., D)
list of points or single point. `D` is the objects dimension.
Returns
-------
:
Gradient values of the log-density at the points with shape
(..., D).
"""
raise NotImplementedError
[docs] @abstractmethod
def hess_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Hessian of log-density of the samplers distribution.
Computes the Hessian of the log-density of the samplers underlying
distribution at the given points `x`.
Parameters
----------
x : array_like of shape (..., D)
list of points or single point. `D` is the objects dimension.
Returns
-------
:
Hessian values of the log-density at the points with shape
(..., D, D).
"""
raise NotImplementedError
[docs] @abstractmethod
def sample(self, shape: list | tuple | np.ndarray) -> np.ndarray:
"""Random values in a given shape.
Create an array of the given shape and populate it with random samples
from the samplers distribution.
Parameters
----------
shape : array_like, optional
The dimensions of the returned array, should all be positive.
If no argument is given a single Python float is returned.
Returns
-------
:
Random values of specified shape.
"""
raise NotImplementedError
[docs]class NormalSampler(Sampler):
"""Sampler for univariate normally distributed samples.
Parameters
----------
mean : float
Mean of the Gaussian distribution.
var : float
Variance of the Gaussian distribution.
"""
def __init__(self, mean: float, var: float) -> None:
"""Initiate NormalSampler object."""
self.domain = np.array([-np.inf, np.inf], ndmin=2) # shape is (1,2)
self._mean = mean # work-around to comply with ABC Sampler class
assert var >= 0
self.var = var
@property
def mass(self) -> float:
"""Mass of the PDF."""
return 1
@property
def dimension(self) -> float:
"""Dimension of the parameters."""
return self.domain.shape[0]
@property
def maximum(self) -> float:
"""Maximum value of the PDF."""
return 1 / np.sqrt(2 * np.pi * self.var)
@property
def mean(self) -> float:
"""Mean value of the distribution."""
return self._mean
@property
def cov(self) -> float:
"""(Co)Variance of the distribution."""
return self.var
@property
def std(self) -> float:
"""Standard deviation."""
return np.sqrt(self.var)
[docs] def pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of PDF evaluated in `x`.
"""
return scipy.stats.norm.pdf(x, loc=self.mean, scale=np.sqrt(self.var))
[docs] def log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of log-PDF evaluated in `x`.
"""
return scipy.stats.norm.logpdf(x, loc=self.mean, scale=np.sqrt(self.var))
[docs] def grad_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate gradient of log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of gradient (vector valued) of log-PDF evaluated in `x`.
"""
return -(x - self.mean) / self.var
[docs] def hess_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate Hessian of log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of Hessian (matrix valued) of log-PDF evaluated in `x`.
"""
return -1 / self.var * np.ones_like(x)
[docs] def sample(self, shape: int | list | tuple | np.ndarray) -> np.ndarray:
"""Draw samples from distribution.
Parameters
----------
shape : array_like
Shape of the samples.
Returns
-------
:
Random samples of specified shape.
"""
return np.random.normal(self.mean, np.sqrt(self.var), shape)
[docs]class GammaSampler(Sampler):
"""Sampler for univariate Gamma distributed samples on given domain.
Parameters
----------
domain : array_like
Supported domain of distribution.
alpha : float
Parameter for Gamma distribution.
beta : float
Parameter for Gamma distribution.
"""
def __init__(
self, domain: list | tuple | np.ndarray, alpha: float, beta: float
) -> None:
"""Initiate GammaSampler object."""
self.domain = np.reshape(np.array(domain), (-1, 2))
assert self.domain.shape == (1, 2)
assert self.domain[0, 1] == np.inf
self.alpha = alpha
self.beta = beta
assert self.alpha > 0 and self.beta > 0
@property
def dimension(self) -> float:
"""Dimension of the parameters."""
return 1
@property
def mass(self) -> float:
"""Mass of the PDF."""
return 1
@property
def maximum(self) -> float:
"""Maximum value of the PDF.
The maximum of the Gamma distribution is given by
.. math::
\\max_{x\\in[a,\\infty)} f(x) =
\\begin{cases}
\\infty & \\mbox{if } 0 < \\alpha < 1\\\\
\\frac{\\beta^\\alpha}{\\Gamma(\\alpha)} & \\mbox{if } \\alpha = 1\\\\
\\frac{\\beta^\\alpha}{\\Gamma(\\alpha)} \\Bigl(\\frac{\\alpha-1}{\\beta} \\Bigr)^{\\alpha-1} e^{1-\\alpha} & \\mbox{if } \\alpha > 1\\\\
\\end{cases}
"""
if self.alpha < 1:
return np.inf
if self.alpha == 1:
return self.beta**self.alpha / gamma(self.alpha)
return (
self.beta**self.alpha
/ gamma(self.alpha)
* ((self.alpha - 1) / self.beta) ** (self.alpha - 1)
* np.exp(-(self.alpha - 1))
)
@property
def mean(self) -> float:
"""Mean value of the distribution."""
return self.alpha / self.beta + self.domain[0, 0]
@property
def cov(self) -> float:
"""(Co)Variance of the distribution."""
return self.alpha / self.beta**2
@property
def var(self) -> float:
"""Variance of the distribution."""
return self.cov
@property
def std(self) -> float:
"""Standard deviation of the distribution."""
return np.sqrt(self.var)
[docs] def pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of PDF evaluated in `x`.
"""
y = x - self.domain[0, 0]
return scipy.stats.gamma.pdf(y, a=self.alpha, scale=1.0 / self.beta)
[docs] def log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of log-PDF evaluated in `x`.
"""
y = x - self.domain[0, 0]
return scipy.stats.gamma.logpdf(y, a=self.alpha, scale=1.0 / self.beta)
[docs] def grad_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate gradient of log-PDF.
.. note::
Not yet implemented.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of gradient (vector valued) of log-PDF evaluated in `x`.
"""
raise NotImplementedError
[docs] def hess_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate Hessian of log-PDF.
.. note::
Not yet implemented.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of Hessian (matrix valued) of log-PDF evaluated in `x`.
"""
raise NotImplementedError
[docs] def sample(self, shape: int | list | tuple | np.ndarray) -> np.ndarray:
"""Draw samples from distribution.
Parameters
----------
shape : array_like
Shape of the samples.
Returns
-------
:
Random samples of specified shape.
"""
ret = np.random.gamma(self.alpha, 1.0 / self.beta, shape)
return ret + self.domain[0, 0]
[docs]class BetaSampler(Sampler):
"""Sampler for univariate Beta distributed samples on given domain.
Parameters
----------
domain : array_like
Supported domain of distribution.
alpha : float
Parameter for Beta distribution.
beta : float
Parameter for Beta distribution.
"""
def __init__(
self, domain: list | tuple | np.ndarray, alpha: float, beta: float
) -> None:
"""Initiate BetaSampler object."""
self.domain = np.reshape(np.array(domain), (-1, 2))
assert self.domain.shape == (1, 2)
self.length = self.domain[0, 1] - self.domain[0, 0]
self.alpha = alpha
self.beta = beta
assert self.alpha > 0 and self.beta > 0
@property
def dimension(self):
"""Dimension of the parameters."""
return 1
@property
def mass(self):
"""Mass of the PDF."""
return 1
@property
def maximum(self):
"""Maximum value of the PDF.
The maximum of the Beta distribution is given by
.. math::
\\max_{x\\in[a,b]} f(x) =
\\begin{cases}
\\infty & \\mbox{if } 0 < \\alpha < 1 \\mbox{ or } 0 < \\beta < 1,\\\\
\\frac{1}{(b-a)B(\\alpha,\\beta)} & \\mbox{if } \\alpha = 1 \\mbox{ or } \\beta = 1,\\\\
\\frac{(\\alpha-1)^{\\alpha-1}(\\beta-1)^{\\beta-1}}{(\\alpha+\\beta-2)^{\\alpha+\\beta-2}(b-a)B(\\alpha,\\beta)} & \\mbox{if } \\alpha > 1 \\mbox{ and } \\beta > 1, \\\\
\\end{cases}
where :math:`B(\\alpha,\\beta)` denotes the Beta-function.
"""
if self.alpha < 1 or self.beta < 1:
return np.inf
Beta = gamma(self.alpha) * gamma(self.beta) / gamma(self.alpha + self.beta)
if self.alpha == 1 or self.beta == 1:
return 1 / (Beta * self.length)
val = (
(self.alpha - 1) ** (self.alpha - 1)
* (self.beta - 1) ** (self.beta - 1)
/ (self.alpha + self.beta - 2) ** (self.alpha + self.beta - 2)
)
return val / (Beta * self.length)
@property
def mean(self) -> float:
"""Mean value of the distribution."""
shift = self.domain[0, 0]
scale = self.length
return scale * self.alpha / (self.alpha + self.beta) + shift
@property
def cov(self) -> float:
"""(Co)Variance of the distribution."""
a = self.alpha
b = self.beta
return a * b / ((a + b) ** 2 * (a + b + 1)) * self.length
@property
def var(self) -> float:
"""Variance of the distribution."""
return self.cov
@property
def std(self) -> float:
"""Standard deviation of the distribution."""
return np.sqrt(self.var)
[docs] def pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of PDF evaluated in `x`.
"""
y = pt.misc.shift_coord(x, self.domain.flatten(), [0, 1])
ret = scipy.stats.beta.pdf(y, a=self.alpha, b=self.beta)
return ret / self.length
[docs] def log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of log-PDF evaluated in `x`.
"""
y = pt.misc.shift_coord(x, self.domain.flatten(), [0, 1])
ret = scipy.stats.beta.logpdf(y, a=self.alpha, b=self.beta)
return ret - np.log(self.length)
[docs] def grad_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate gradient of log-PDF.
.. note::
Not yet implemented.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of gradient (vector valued) of log-PDF evaluated in `x`.
"""
raise NotImplementedError
[docs] def hess_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate Hessian of log-PDF.
.. note::
Not yet implemented.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of Hessian (matrix valued) of log-PDF evaluated in `x`.
"""
raise NotImplementedError
[docs] def sample(self, shape: int | list | tuple | np.ndarray) -> np.ndarray:
"""Draw samples from distribution.
Parameters
----------
shape : array_like
Shape of the samples.
Returns
-------
:
Random samples of specified shape.
"""
SAMPLE = np.random.beta(self.alpha, self.beta, shape)
return pt.misc.shift_coord(SAMPLE, [0, 1], self.domain.flatten())
[docs]class WLSUnivariateSampler(Sampler):
"""Sampler for univariate optimally distributed samples on given domain.
Given a stochastic variable :math:`y\\in\\Gamma\\subset\\mathbb{R}`
with :math:`y\\sim\\pi` and a finite subset
:math:`\\{P_j\\}_{j=0}^{d-1}` of an orthonormal polynomial basis of
:math:`L^2(\\Gamma,\\pi)`, the optimal weighted least-squares sampling
distribution for a function
:math:`u\\in\\operatorname{span}\\{P_j\\ \\vert\\ j=0,\\dots,d-1 \\}` reads
.. math::
\\mathrm{d}\\mu = w^{-1} \\mathrm{d}\\pi
\\qquad\\mbox{with weight}\\qquad
w^{-1}(y) = \\frac{1}{d}\\sum_{j=0}^{d-1}\\vert P_j(y)\\vert^2.
Parameters
----------
domain : array_like
Interval of support of distribution.
Notes
-----
To generate samples from the weighted least-squares distribution rejection
sampling is used. For certain basis functions it is possible to choose a
well-suited trial sampler for the rejection sampling, which can be enabled
via setting ``tsa=True``.
See Also
--------
pythia.sampler.WLSTensorSampler
References
----------
The optimal weighted least-squares sampling is based on the results in
Cohen & Migliorati [1]_.
.. [1] Cohen, A. and Migliorati, G.,
“Optimal weighted least-squares methods”,
SMAI Journal of Computational Mathematics 3, 181-203 (2017).
"""
def __init__(
self, param: pt.parameter.Parameter, deg: int, tsa: bool = True
) -> None:
"""Initiate WLSUnivariateSampler object."""
self.parameter = param
self.deg = deg
self._base_sampler = assign_sampler(param)
self.domain = self._base_sampler.domain
self._basis = pt.basis.univariate_basis([self.parameter], [self.deg])[0]
self._tsa = tsa
self._trial_sampler, self._bulk = self._compute_trial_sampler()
assert self.domain.shape == (1, 2)
# self._length = self.domain[0, 1] - self.domain[0, 0]
@property
def dimension(self) -> int:
"""Parameter dimension."""
return self.domain.shape[0]
@property
def mass(self) -> float:
"""Mass of the PDF."""
return 1
@property
def maximum(self) -> float:
"""Maximum value of the PDF."""
if self.parameter.distribution == "uniform":
# alternatively return pdf in domain[0, 1]
return self.pdf(self.domain[0, 0])
elif self.parameter.distribution == "normal":
if self.deg % 2 == 0:
return self.pdf(self.parameter.mean)
else:
# Note: this can be improved
return get_maximum(self.pdf, self.domain)
elif self.parameter.distribution == "gamma":
# Note: this is probably not going to work for domain = [a, inf]
return get_maximum(self.pdf, self.domain)
elif self.parameter.distribution == "beta":
# Note: this can be improved
return get_maximum(self.pdf, self.domain)
return get_maximum(self.pdf, self.domain)
@property
def mean(self) -> float:
"""Mean value of the distribution."""
mean, err = quad(
lambda x: x * self.pdf(x), self.domain[0, 0], self.domain[0, 1]
)
if err > 1e-8:
warnings.warn(f"quadrature error large: ({err})")
return mean
@property
def cov(self) -> float:
"""(Co)Variance of the distribution."""
moment, err = quad(
lambda x: x**2 * self.pdf(x), self.domain[0, 0], self.domain[0, 1]
)
if err > 1e-8:
warnings.warn(f"quadrature error large: ({err})")
return moment - self.mean**2
@property
def var(self) -> float:
"""Variance of the distribution."""
return self.cov
@property
def std(self) -> float:
"""Standard deviation of the distribution."""
return np.sqrt(self.var)
[docs] def weight(self, x: np.ndarray | float | int) -> np.ndarray:
"""Weights for the pdf.
Parameters
----------
x : np.ndarray
Points the weight function is evaluated in.
Returns
-------
w : array_like
Weights of evaluation points `x`.
"""
if isinstance(x, (float, int)):
x = np.array(x).reshape(1)
if x.ndim == 2 and x.shape[1] == 1:
x = np.array(x).reshape(-1)
assert x.ndim < 2
b_eval = np.sum([np.abs(p(x)) ** 2 for p in self._basis], axis=0)
return (self.deg + 1) / b_eval
[docs] def pdf(self, x: np.ndarray | float | int) -> np.ndarray:
"""Evaluate uniform PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of PDF evaluated in `x`.
"""
if isinstance(x, (float, int)):
x = np.array(x).reshape(1)
if x.ndim == 2 and x.shape[1] == 1:
x = np.array(x).reshape(-1)
return self._base_sampler.pdf(x) / self.weight(x)
[docs] def log_pdf(self, x: np.ndarray | float | int) -> np.ndarray:
"""Evaluate uniform log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of log-PDF evaluated in `x`.
"""
if isinstance(x, (float, int)):
x = np.array(x).reshape(1)
if x.ndim == 2 and x.shape[1] == 1:
x = np.array(x).reshape(-1)
return self._base_sampler.log_pdf(x) - np.log(self.weight(x))
[docs] def grad_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate gradient of uniform log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of gradient (vector valued) of log-PDF evaluated in `x`.
"""
raise NotImplementedError
[docs] def hess_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate Hessian of uniform log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of Hessian (matrix valued) of log-PDF evaluated in `x`.
"""
raise NotImplementedError
def _compute_trial_sampler(self) -> tuple[Sampler, float]:
"""Trial sampler adaptation.
.. note::
TSA currently only available for uniform parameter distribution.
Parameters
----------
tsa : bool
Adapt trial sampler or simply use uniform product sampler.
Returns
-------
trial_sampler : pt.sampler.Sampler
Trial sampler.
bulk : array_like
Domain estimate of the mass of the distribution.
"""
if self._tsa is True and self.parameter.distribution == "uniform":
trial_sampler = BetaSampler(self.parameter.domain, 0.5, 0.5)
else:
trial_sampler = UniformSampler(self.parameter.domain)
bulk = get_maximum(lambda x: self.pdf(x) / trial_sampler.pdf(x), self.domain)
return trial_sampler, bulk
[docs] def sample(self, shape: int | list | tuple | np.ndarray) -> np.ndarray:
"""Draw samples from weighted least-squares parameter distribution.
Parameters
----------
shape : array_like
Shape of the samples.
Returns
-------
:
Random samples of specified shape.
"""
if isinstance(shape, int):
shape = (shape,)
samples = rejection_sampling(
self.pdf, self._trial_sampler, self._bulk, self.dimension, shape
)
return samples
[docs]class ProductSampler(Sampler):
"""Tensor sampler for independent parameters.
Sampler for cartesian product samples of a list of (independent) univariate
samplers.
Parameters
----------
sampler_list : list of `pythia.sampler.Sampler`
list of (univariate) Sampler objects.
"""
def __init__(self, sampler_list: list[Sampler]) -> None:
"""Initiate ProductSampler object."""
self.samplers = list(sampler_list)
self.domain = np.squeeze(np.array([s.domain for s in self.samplers]))
# Add dimension if len(sampler_list) == 1.
if self.domain.ndim < 2:
self.domain.shape = 1, 2
# sampler_list should contain 1D samplers only
assert self.domain.shape == (len(sampler_list), 2)
@property
def dimension(self) -> int:
"""Dimension of the parameters."""
return self.domain.shape[0]
@property
def mass(self) -> float:
"""Mass of the PDF."""
return np.prod(np.array([s.mass for s in self.samplers]))
@property
def maximum(self) -> float:
"""Maximum value of the PDF."""
return np.prod(np.array([s.maximum for s in self.samplers]))
@property
def mean(self) -> np.ndarray:
"""Mean of the PDF."""
return np.array([s.mean for s in self.samplers])
@property
def cov(self) -> np.ndarray:
"""Covariance of the PDF."""
return np.diag([s.cov for s in self.samplers])
[docs] def weight(self, x: np.ndarray) -> np.ndarray:
"""Weights of the product PDF.
Parameters
----------
x : np.ndarray
Evaluation points.
Returns
-------
:
Array of uniform weights for samples.
"""
if x.ndim == 1:
x.shape = 1, -1
assert x.ndim == 2
return np.ones(x.shape[0])
[docs] def pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate PDF.
The PDF is given by the product of the univariate PDFs.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of PDF evaluated in `x`.
"""
assert x.shape[-1] == self.dimension
densities = [s.pdf for s in self.samplers]
val = np.array([densities[jj](x[..., jj]) for jj in range(self.dimension)])
return np.prod(val, axis=0)
[docs] def log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate log-PDF.
The log-PDF is given by the sum of the univariate log-PDFs.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of log-PDF evaluated in `x`.
"""
assert x.shape[-1] == self.dimension
densities = [s.log_pdf for s in self.samplers]
val = np.array([densities[jj](x[..., jj]) for jj in range(self.dimension)])
return np.sum(val, axis=0)
[docs] def grad_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate gradient of log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of gradient (vector valued) of log-PDF evaluated in `x`.
"""
grad_densities = [s.grad_x_log_pdf for s in self.samplers]
return np.array(
[grad_densities[jj](x[..., jj]) for jj in range(self.dimension)]
).T
[docs] def hess_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate Hessian of log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of Hessian (matrix valued) of log-PDF evaluated in `x`.
"""
# create an (x.shape[0], self._dim, self._dim) tensor where each
# (self._dim, self._dim) matrix is the identity
eye = np.tile(np.expand_dims(np.eye(self.dimension), 0), (x.shape[0], 1, 1))
# create an (x.shape[0],self._dim,1) tensor where each (x.shape[0],1,1)
# subtensor is a diagonal entry of the hessian
hess_densities = [s.hess_x_log_pdf for s in self.samplers]
hess = np.expand_dims(
np.array(
[hess_densities[jj](x[..., jj]) for jj in range(self.dimension)]
).T,
2,
)
return eye * hess
[docs] def sample(self, shape: int | list | tuple | np.ndarray) -> np.ndarray:
"""Draw samples from distribution.
Parameters
----------
shape : array_like
Shape of the samples.
Returns
-------
:
Random samples of specified shape.
"""
if isinstance(shape, int):
shape = (shape,)
samples = [s.sample(shape) for s in self.samplers]
return np.stack(samples, -1)
[docs]class ParameterSampler(Sampler):
"""Product sampler of given parameters.
Parameters
----------
params : list of `pythia.parameter.Parameter`
list containing information of parameters.
"""
def __init__(self, params: list[pt.parameter.Parameter]) -> None:
"""Initiate ParameterSampler object."""
self.parameter = params
assert isinstance(self.parameter, list)
self.domain = np.array([param.domain for param in self.parameter])
self._product_sampler = ProductSampler(
[assign_sampler(param) for param in self.parameter]
)
@property
def dimension(self) -> int:
"""Dimension of the parameters."""
return self._product_sampler.dimension
@property
def mass(self) -> float:
"""Mass of the PDF."""
return self._product_sampler.mass
@property
def maximum(self) -> float:
"""Maximum value of the PDF."""
return self._product_sampler.maximum
@property
def mean(self) -> np.ndarray:
"""Mean of the PDF."""
return self._product_sampler.mean
@property
def cov(self) -> np.ndarray:
"""Covariance of the PDF."""
return self._product_sampler.cov
[docs] def weight(self, x: np.ndarray) -> np.ndarray:
"""Weights of the parameter product PDF.
Parameters
----------
x : np.ndarray
Evaluation points.
Returns
-------
:
Array of uniform weights for samples.
"""
return self._product_sampler.weight(x)
[docs] def pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of PDF evaluated in `x`.
"""
return self._product_sampler.pdf(x)
[docs] def log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate log-PDF.
The log-PDF is given by the sum of the univariate log-PDFs.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of log-PDF evaluated in `x`.
"""
return self._product_sampler.log_pdf(x)
[docs] def grad_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate gradient of log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of gradient (vector valued) of log-PDF evaluated in `x`.
"""
return self._product_sampler.grad_x_log_pdf(x)
[docs] def hess_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate Hessian of log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of Hessian (matrix valued) of log-PDF evaluated in `x`.
"""
return self._product_sampler.hess_x_log_pdf(x)
[docs] def sample(self, shape: int | list | tuple | np.ndarray) -> np.ndarray:
"""Draw samples from distribution.
Parameters
----------
shape : array_like
Shape of the samples.
Returns
-------
:
Random samples of specified shape.
"""
return self._product_sampler.sample(shape)
[docs]class WLSSampler(Sampler):
"""Weighted Least-Squares sampler.
Given a stochastic variable :math:`y\\in\\Gamma\\subset\\mathbb{R}^M`
with :math:`y\\sim\\pi`, a set of multiindices
:math:`\\Lambda\\subset\\mathbb{N}_0^M` and a finite subset
:math:`\\{P_\\alpha\\}_{\\alpha\\in\\Lambda}` of an orthonormal polynomial
basis of :math:`L^2(\\Gamma,\\pi)`, the optimal weighted least-squares
sampling distribution for a function
:math:`u\\in\\operatorname{span}\\{P_\\alpha\\ \\vert\\ \\alpha\\in\\Lambda\\}`
reads
.. math::
\\mathrm{d}\\mu = w^{-1} \\mathrm{d}\\pi
\\qquad\\mbox{with weight}\\qquad
w^{-1}(y) = \\frac{1}{\\vert\\Lambda\\vert}\\sum_{\\alpha\\in\\Lambda}\\vert P_\\alpha(y)\\vert^2,
where :math:`\\vert\\Lambda\\vert` denotes the number of elements in
:math:`\\Lambda`.
Parameters
----------
params : list of `pythia.parameter.Parameter`
list of parameters.
basis : list
list of basis functions.
tsa : bool, default=False
Trial sampler adaptation. If True, a trial sampler is chosen on the
distributions of parameters, if false a uniform trial sampler is used.
Other Parameters
----------------
trial_sampler : pythia.sampler.Sampler, default=None
Trial sampler for rejection sampling. If `tsa` is true and either
`trial_sampler` or `bulk` are `None`, the trial sampler is chosen
automatically.
bulk : float, defaul=None
Scaling for trial sampler. If `tsa` is true and either
`trial_sampler` or `bulk` are `None`, the trial sampler is chosen
automatically.
Notes
-----
To generate samples from the weighted least-squares distribution rejection
sampling is used. For certain basis functions it is possible to choose a
well-suited trial sampler for the rejection sampling, which can be enabled
via setting ``tsa=True``.
See Also
--------
pythia.sampler.WLSUnivariateSampler, pythia.sampler.WLSTensorSampler
References
----------
The optimal weighted least-squares sampling is based on the results of
Cohen & Migliorati [1]_.
"""
def __init__(
self,
params: list[pt.parameter.Parameter],
basis: list[Callable],
tsa: bool = True,
trial_sampler: Sampler | None = None,
bulk: float | None = None,
) -> None:
"""Initiate WLSSampler object."""
self.parameter = params
self._param_sampler = ParameterSampler(self.parameter)
self.domain = self._param_sampler.domain
self.basis = basis
self._tsa = tsa
if trial_sampler is None or bulk is None:
self._trial_sampler, self._bulk = self._compute_trial_sampler()
else:
self._trial_sampler = trial_sampler
self._bulk = bulk
@property
def dimension(self) -> int:
"""Dimension of the parameters."""
return self._param_sampler.dimension
@property
def mass(self):
"""Mass of the PDF."""
return 1
@property
def maximum(self) -> float:
"""Maximum value of the PDF."""
return get_maximum(self.pdf, self.domain)
@property
def mean(self) -> np.ndarray:
"""Mean of the PDF."""
raise NotImplementedError
@property
def cov(self) -> np.ndarray:
"""Covariance of the PDF."""
raise NotImplementedError
[docs] def weight(self, x: np.ndarray) -> np.ndarray:
"""Weights for the PDF.
Parameters
----------
x : array_like
Points the weight function is evaluated in.
Returns
-------
:
weights of evaluation points `x`.
"""
km = np.sum([abs(p(x)) ** 2 for p in self.basis], axis=0)
return len(self.basis) / km
[docs] def pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of PDF evaluated in `x`.
"""
assert x.shape[-1] == self.dimension
return self._param_sampler.pdf(x) / self.weight(x)
[docs] def log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate log-PDF.
The log-PDF is given by the sum of the univariate log-PDFs.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of log-PDF evaluated in `x`.
"""
assert x.shape[-1] == self.dimension
return self._param_sampler.log_pdf(x) - np.log(self.weight(x))
[docs] def grad_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate gradient of log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of gradient (vector valued) of log-PDF evaluated in `x`.
"""
raise NotImplementedError
[docs] def hess_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate Hessian of log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of Hessian (matrix valued) of log-PDF evaluated in `x`.
"""
raise NotImplementedError
def _compute_trial_sampler(self) -> tuple[Sampler, float]:
"""Trial sampler adaptation.
Parameters
----------
tsa : bool
Adapt trial sampler or simply use uniform product sampler.
Returns
-------
trial_sampler : pt.sampler.Sampler
Trial sampler.
bulk : array_like
Domain estimate of the mass of the distribution.
"""
sampler_list = []
for param in self.parameter:
if self._tsa is True and param.distribution == "uniform":
sampler_list.append(BetaSampler(param.domain, 0.5, 0.5))
else:
sampler_list.append(UniformSampler(param.domain))
trial_sampler = ProductSampler(sampler_list)
bulk = get_maximum(lambda x: self.pdf(x) / trial_sampler.pdf(x), self.domain)
return trial_sampler, bulk
[docs] def sample(self, shape: int | list | tuple | np.ndarray) -> np.ndarray:
"""Draw samples from distribution.
Parameters
----------
shape : array_like
Shape of the samples.
Returns
-------
:
Random samples of specified shape.
"""
if isinstance(shape, int):
shape = (shape,)
samples = rejection_sampling(
self.pdf, self._trial_sampler, self._bulk, self.dimension, shape
)
return samples
[docs]class WLSTensorSampler(Sampler):
"""Weighted least-squares sampler for tensor multivariate basis.
Given a stochastic variable :math:`y\\in\\Gamma\\subset\\mathbb{R}^M`
with :math:`y\\sim\\pi=\\prod_{m=1}^M\\pi_m` for one dimensional densities
:math:`\\pi_m`, a tensor set of multiindices
:math:`\\Lambda=[d_1]\\times\\dots\\times[d_M]\\subset\\mathbb{N}_0^M`,
where :math:`[d_m]=\\{0,\\dots,d_m-1\\}`, and a finite subset
:math:`\\{P_\\alpha\\}_{\\alpha\\in\\Lambda}` of an orthonormal product
polynomial basis of :math:`L^2(\\Gamma,\\pi)`, i.e.,
:math:`P_\\alpha(y) = \\prod_{m=1}^M P_{\\alpha_m}(y_m)`, the optimal
weighted least-squares sampling distribution for a function
:math:`u\\in\\operatorname{span}\\{P_\\alpha\\ \\vert\\ \\alpha\\in\\Lambda\\}`
reads
.. math::
\\mathrm{d}\\mu = w^{-1} \\mathrm{d}\\pi
\\qquad\\mbox{with weight}\\qquad
w^{-1}(y) = \\prod_{m=1}^M\\frac{1}{d_m}\\sum_{\\alpha_m\\in[d_m]}\\vert P_{\\alpha_m}(y_m)\\vert^2.
Parameters
----------
params : list of `pythia.parameter.Parameter`
Parameter list.
deg : list of int
Polynomial degree of each component (same for all).
tsa : bool, default=True
Trial sampler adaptation. If True, a trial sampler is chosen on the
distributions of parameters, if false a uniform trial sampler is
used.
Notes
-----
To generate samples from the weighted least-squares distribution rejection
sampling is used. For certain basis functions it is possible to choose a
well-suited trial sampler for the rejection sampling, which can be enabled
via setting ``tsa=True``.
See Also
--------
pythia.sampler.WLSUnivariateSampler
References
----------
The optimal weighted least-squares sampling is based on the results in
Cohen & Migliorati [1]_.
"""
def __init__(
self, params: list[pt.parameter.Parameter], deg: list[int], tsa: bool = True
) -> None:
"""Initiate WLSTensorSampler object."""
self.parameter = params
self.deg = deg
self.domain = np.array([param.domain for param in self.parameter])
self._tsa = tsa
# self._uBasis = pt.basis.univariate_basis(self.parameter, self.deg)
# self._univariate_samplers = [assign_sampler(param)
# for param in self.parameter]
self._univariate_samplers = [
WLSUnivariateSampler(p, d, self._tsa)
for (p, d) in zip(self.parameter, self.deg)
]
self._product_sampler = ProductSampler(self._univariate_samplers)
@property
def dimension(self) -> int:
"""Dimension of the parameters."""
return self._product_sampler.dimension
@property
def mass(self) -> float:
"""Mass of the PDF."""
return self._product_sampler.mass
@property
def maximum(self) -> float:
"""Maximum value of the PDF."""
return self._product_sampler.maximum
@property
def mean(self) -> np.ndarray:
"""Mean of the PDF."""
return self._product_sampler.mean
@property
def cov(self) -> np.ndarray:
"""Covariance of the PDF."""
return self._product_sampler.cov
[docs] def weight(self, x: np.ndarray) -> np.ndarray:
"""Weights for the PDF.
Parameters
----------
x : array_like
Points the weight function is evaluated in.
Returns
-------
:
Weights of evaluation points `x`.
"""
assert x.shape[-1] == self.dimension
weights = [s.weight for s in self._univariate_samplers]
val = np.array([weights[jj](x[..., jj]) for jj in range(self.dimension)])
return np.prod(val, axis=0)
[docs] def pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of PDF evaluated in `x`.
"""
return self._product_sampler.pdf(x)
[docs] def log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate log-PDF.
The log-PDF is given by the sum of the univariate log-PDFs.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of log-PDF evaluated in `x`.
"""
return self._product_sampler.log_pdf(x)
[docs] def grad_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate gradient of log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of gradient (vector valued) of log-PDF evaluated in `x`.
"""
return self._product_sampler.grad_x_log_pdf(x)
[docs] def hess_x_log_pdf(self, x: np.ndarray) -> np.ndarray:
"""Evaluate Hessian of log-PDF.
Parameters
----------
x : array_like
Evaluation points.
Returns
-------
:
Values of Hessian (matrix valued) of log-PDF evaluated in `x`.
"""
return self._product_sampler.hess_x_log_pdf(x)
[docs] def sample(self, shape: int | list | tuple | np.ndarray) -> np.ndarray:
"""Draw samples from distribution.
Parameters
----------
shape : array_like
Shape of the samples.
Returns
-------
:
Random samples of specified shape.
"""
return self._product_sampler.sample(shape)
[docs]def rejection_sampling(
pdf: Callable,
trial_sampler: Sampler,
scale: float,
dimension: int,
shape: int | list | tuple | np.ndarray,
) -> np.ndarray:
"""Draw samples from pdf by rejection sampling.
Parameters
----------
pdf : Callable
Probability density the samples are generated from.
trial_sampler : Sampler
Trial sampler proposal samples are drawn from.
scale : float
Threshold parameter with ``pdf <= scale * trialSampler.pdf``
dimension : int
Dimension of the (input of the) pdf.
shape : array_like
Shape of the samples.
Returns
-------
:
Random samples of specified shape.
"""
if isinstance(shape, int):
shape = (shape,)
size = np.prod(np.array(shape))
trial_samples = trial_sampler.sample(size)
samples = np.empty_like(trial_samples)
pointer = 0
while pointer < size:
trial_samples = trial_sampler.sample(size)
checker = np.random.rand(trial_samples.shape[0])
is_valid = checker * max(scale, 1) * trial_sampler.pdf(trial_samples) <= pdf(
trial_samples
)
valid_samples = trial_samples[is_valid]
start = pointer
end = min(pointer + valid_samples.shape[0], size)
samples[start:end] = valid_samples[
0 : min(valid_samples.shape[0], size - pointer)
]
pointer = end
if dimension > 1:
return np.moveaxis(samples.T.reshape(dimension, *shape), 0, -1)
return samples.reshape(*shape)
[docs]def constraint_sampling(
sampler: Sampler,
constraints: list[Callable],
shape: int | list | tuple | np.ndarray,
) -> np.ndarray:
"""Draw samples according to algebraic constraints.
Draw samples from target distribution and discard samples that do not
satisfy the constraints.
Parameters
----------
sampler : Sampler
Sampler to sample from.
constraints : list of callable
list of functions that return True if sample point satisfies the
constraint.
Returns
-------
:
Samples drawn from sampler satisfying the constraints.
Notes
-----
The constaints may lead to a non-normalized density function.
"""
if isinstance(shape, int):
shape = (shape,)
samples = np.empty(sampler.sample(np.prod(shape)).shape)
for j_sample in range(samples.shape[0]):
proposal = sampler.sample((1,))
met_constraints = np.all([c(proposal) for c in constraints])
while not met_constraints:
proposal = sampler.sample((1,))
met_constraints = np.all([c(proposal) for c in constraints])
samples[j_sample] = proposal
return np.moveaxis(samples.T.reshape(sampler.dimension, *shape), 0, -1)
[docs]def assign_sampler(param: pt.parameter.Parameter) -> Sampler:
"""Assign a univariate sampler to the given parameter.
Parameters
----------
param : pythia.parameter.Parameter
Returns
-------
:
Univariate sampler.
"""
if param.distribution == "uniform":
return UniformSampler(param.domain)
elif param.distribution == "normal":
return NormalSampler(param.mean, param.var)
elif param.distribution == "gamma":
return GammaSampler(param.domain, param.alpha, param.beta)
elif param.distribution == "beta":
return BetaSampler(param.domain, param.alpha, param.beta)
raise ValueError(f"unknown distribution '{param.distribution}'")
[docs]def get_maximum(f: Callable, domain: list | tuple | np.ndarray, n: int = 1000) -> float:
"""Compute essential maximum of function by point evaluations.
Parameters
----------
f : callable
Function to evaluate. Needs to map from n-dim space to 1-dim space.
domain : array_like
Domain to evaluate function on.
n : int, default=1000
Number of function evaluations. Evaluations are done on a uniform grid
in domain. Actual number of points may thus be a little greater.
Returns
-------
:
Approximation of maximum of function `f`.
"""
if not isinstance(domain, np.ndarray):
domain = np.array(domain)
assert domain.shape[-1] == 2 and domain.ndim == 2
n_points = int(np.ceil(np.power(n, 1 / domain.shape[0])))
if domain.shape[0] > 1:
eps = np.finfo(float).eps # circumvent inf on bdry
x = pt.misc.cart_prod(
[np.linspace(dom[0] + eps, dom[1] - eps, n_points) for dom in domain]
)
else:
x = np.linspace(domain[0, 0], domain[0, 1], n_points).reshape(-1, 1)
return np.max(np.abs(f(x)))
if __name__ == "__main__":
pass