import numpy as np
import os
import datetime
import shutil
[docs]def distributionDict():
""" Set aliases for distribution descriptions.
.. deprecated:: 2.0.0
`distributionDict` will be removed in PyThia 3.0.0.
Returns
-------
distDict : dict
Dictionary with aliases for distribution descriptions.
"""
distDict = {}
distDict['uniform'] = ['uniform', 'Uniform', 'U']
distDict['normal'] = ['normal', 'gauss', 'Gauss', 'gaussian', 'N']
distDict['gamma'] = ['gamma', 'Gamma', 'G']
distDict['beta'] = ['beta', 'Beta', 'B']
return distDict
[docs]def shiftCoord(x, T, I):
""" Map `x` in interval `T` to interval `I`.
Use an affine transformation to shift points :math:`x` from the interval
:math:`T = [t_0, t_1]` to the interval :math:`I = [a,b]`.
Parameters
----------
x : array_like
Points in interval :math:`T`.
T : array_like
Original interval.
I : array_like
Target interval.
Returns
-------
array_like
Shifted values for `x`.
"""
return ((I[1]-I[0])*x + I[0]*T[1] - I[1]*T[0]) / (T[1]-T[0])
[docs]def cartProd(arrayList):
""" Compute the outer product of two or more arrays.
Assemble an array containing all possible combinations of the elements
of the input vectors :math:`v_1,\\dots,v_n`.
Parameters
----------
arrayList : list of array_like
List of vectors :math:`v_1,\\dots,v_n`.
Returns
-------
x : array_like
Cartesian product array.
"""
dim = np.shape(arrayList)[0]
x = np.hstack((np.meshgrid(*arrayList))).swapaxes(0, 1).reshape(dim, -1).T
return x
[docs]def is_contained(val, domain):
""" Check if a given value (vector) is contained in a domain.
Checks if each component of the vector lies in the one dimensional
interval of the corresponding component of the domain.
Parameters
----------
val : array_like
Vector to check containment in domain
domain : array_like
Product domain of one dimensional intervals.
Returns
-------
bool
Bool stating if value is contained in domain.
"""
if val.ndim < 2:
val.shape = 1, -1
if domain.ndim < 2:
domain.shape = 1, -1
assert val.ndim == 2
assert domain.ndim == 2 and domain.shape[1] == 2
if np.all(domain[:, 0] <= val) and np.all(val <= domain[:, 1]):
return True
else:
return False
[docs]def now(sep=False):
""" Get string of current machine date and time.
Parameters
----------
sep : bool, default=False
If true, return date and time separately.
Returns
-------
tuple of str or str
Tuple containing date and time or single string of both concatenated.
"""
dt = datetime.datetime.now()
today = '{:04}-{:02}-{:02} '.format(dt.year, dt.month, dt.day)
now = '{:02}:{:02}:{:02}'.format(dt.hour, dt.minute, dt.second)
if sep:
return today, now
else:
return today + now
[docs]def line(indicator, message=None):
""" Print a line of 80 characters by repeating indicator.
An additional message can be given.
Parameters
----------
indicator : string
Indicator the line consists of, e.g. '-', '+' or '+-'.
message : string, optional
Message integrated in the line.
Returns
-------
text : str
String of 80 characters length.
"""
text = ''
if message is not None:
mLength = len(message)
lengthFromLeft = 2+1+mLength+1
text = 2*indicator
text = text[:2] + ' ' + message + ' '
while len(text) < 80:
text += indicator
return text[:80]
[docs]def save(filename, data, path='./'):
"""
Wrapper for `numpy.save` that assures path directory is created if
necessary and that backs old data up if existent.
Parameters
----------
name : str
Filename to save data to.
data : array_like
Data to save as .npy file.
path : str, default='./'
Path under which the file should be created.
"""
if not os.path.isdir(path):
os.makedirs(path)
if os.path.isfile(path+filename):
shutil.copyfile(path+filename, path+filename+'.backup')
np.save(path+filename, data)
[docs]def load(filename):
""" Alias for numpy.load(). """
return np.load(filename)
[docs]def str2iter(string, iterType=list, dataType=int):
""" Cast `str(iterable)` to `iterType` of `dataType`.
Cast a string of lists, tuples, etc to the specified iterable and data
type, i.e., for `iterType=tuple` and `dataType=float` cast
``str([1,2,3]) -> (1.0, 2.0, 3.0)``.
Parameters
----------
string : str
String representation of iterable.
iterType : iterable, default=list
Iterable type the string is converted to.
dataType : type, default=int
Data type of entries of iterable, e.g. `int` or `float`.
"""
return iterType([dataType(s.strip()) for s in string[1:-1].split(",")])
[docs]def batch(iterable, n=1):
""" Split iterable into different batches of batchsize n.
Parameters
----------
iterable : array_like
Iterable to split.
n : int, default=1
Batch size.
Yields
------
iterable
Iterable for different batches.
"""
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
[docs]def wlsSamplingBound(m, c=4):
""" Compute the weighted Least-Squares sampling bound.
The number of samples :math:`n` is chosen such that
.. math::
\\frac{n}{\\log(n)} \geq cm,
where :math:`m` is the dimension of the Gramian matrix (number of PC
expansion terms) and :math:`c` is an arbitrary constant. In
Cohen & Migliorati 2017 the authors observed that the coice :math:`c=4`
yields a well conditioned Gramian with high probability.
Parameters
----------
m : int
Dimension of Gramian matrix.
c : float, default=4
Scaling constant.
Returns
-------
n : int
Number of required wLS samples.
"""
jj = max(int(np.ceil(c*m*np.log(c*m))), 2)
while True:
if jj / np.log(jj) >= c*m:
n = jj
break
jj += 1
return n
[docs]def paramDictToList(paramDict):
""" Cast dictionary of pythia parameters to a list sorted by index.
.. deprecated:: 2.0.0
`paramDictToList` will be removed in PyThia 3.0.0, parameters should
always be given as a list not a dictionary.
Parameters
----------
paramDict : dict
Dictionary of parameter names and corresponding
`pythia.parameter.RandomParameter`.
Returns
-------
paramList : list of `pythia.parameter.RandomParameter`
Parameter list sorted by index of parameter.
"""
paramList = sorted(
list(paramDict.values()), key=lambda param: param.index
)
return paramList
[docs]def gelman_rubin_condition(chains):
""" Compute Gelman-Rubin criterion.
Implementation of the Gelman-Rubin convergence criterion for multiple
parameters. A Markov chain is said to be in its convergence, if the final
ration is close to one.
Parameters
----------
chains : array_like, ndim=3
Array containing the Markov chains of each parameter. All chains are
equal in length, the assumed shape is
``(#chains, chain length, #params)``.
Returns
-------
float
Value computed by Gelman-Rubin criterion.
"""
assert chains.ndim == 3
M, N, DIM = chains.shape # chains shape is (#chains, len(chains), #params)
# Mean and var of chains.
chain_means = np.mean(chains, axis=1) # shape is (#chains, #params)
chain_vars = np.var(chains, axis=1) # shape is (#chains, #params)
# Mean and var across all chains.
mean = np.mean(chain_means, axis=0).reshape(1, -1) # shape = (1, #params)
var = np.var(chain_means, axis=0).reshape(1, -1) # shape = (1, #params)
# Between chain variance.
B = N / (M-1) * np.sum((chain_means-mean)**2, axis=0)
# Within chain variance.
W = 1 / M * np.sum(chain_vars, axis=0)
# pooled variance
V = (N-1)/N*W + (M+1)/(M*N)*B
return np.sqrt(V / W)
[docs]def get_confidence_interval(samples, rate=0.95, resolution=500):
""" Compute confidence intervals of samples.
Compute the confidence intervals of the 1D marginals of the samples
(slices). The confidence interval of a given rate is the interval around
the median (not mean) of the samples containing roughly `rate` percent of
the total mass. This is computed for the left and right side of the median
independently.
Parameters
----------
samples : array_like, ndim < 3
Array containing the (multidimensional) samples.
rate : float, default=0.95
Fraction of the total mass the interval should contain.
resolution : int, default=500
Number of bins used in histogramming the samples.
Returns
-------
conf_intervals : array_like
Confidence intervals for each component.
"""
if samples.ndim < 2:
samples.shape = -1, 1
assert samples.ndim == 2
assert 0 < rate < 1
conf_intervals = np.empty((samples.shape[1], 2))
for j, s in enumerate(samples.T):
median = np.median(s)
hist, bdry = np.histogram(s, bins=resolution, density=True)
median_bin = np.argmax(bdry > median)-1
cumsum_left = (
np.cumsum(np.flip(hist[:median_bin+1]))
- 0.5*hist[median_bin] # do not count the median bin twice
)
cumsum_right = (
np.cumsum(hist[median_bin:])
- 0.5*hist[median_bin] # do not count the median bin twice
)
lc = median_bin - np.argmax(cumsum_left > rate*cumsum_left[-1])
rc = np.argmax(cumsum_right > rate*cumsum_right[-1]) + median_bin
conf_intervals[j] = np.array([bdry[lc], bdry[rc+1]])
return conf_intervals
[docs]def doerfler_marking(values, idx=None, threshold=0.9):
""" Dörfler marking for arbitrary values.
Parameters
----------
values : array_like
Values for the Dörfler marking.
idx : list of int, optional
List of indices associated with the entries of `values`.
If `None`, this is set to ``range(len(values))``.
threshold : float, default=0.9
Threshold paramter for Dörfler marking.
Returns
-------
idx_reordered : list
Reordered indices given by `idx`.
Ordered from largest to smallest value.
ordered_values : array_like
Reordered values. Ordered from largest to smallest.
marker : int
Threshold marker such that
``sum(values[:marker]) > threshold * sum(values)``.
"""
if values.ndim < 2:
values.shape = -1, 1
assert values.ndim == 2
if idx is None:
idx = np.arange(values.shape[0], dtype=int)
# index list of largest to smalles (absolute) coeff values
i = np.flip(np.argsort(np.abs(values), axis=None), axis=0)
idx_reordered = idx[i]
ordered_values = values[i]
marker = np.argmax(
np.cumsum(ordered_values, axis=0) > threshold*np.sum(values))
# marker = np.argmax(ordered_values < 0.001*ordered_values[0])
return idx_reordered, ordered_values, marker+1
if __name__ == '__main__':
pass