Source code for pythia.misc

import numpy as np
import os
import datetime
import shutil


[docs]def distributionDict(): """ Set aliases for distribution descriptions. .. deprecated:: 2.0.0 `distributionDict` will be removed in PyThia 3.0.0. Returns ------- distDict : dict Dictionary with aliases for distribution descriptions. """ distDict = {} distDict['uniform'] = ['uniform', 'Uniform', 'U'] distDict['normal'] = ['normal', 'gauss', 'Gauss', 'gaussian', 'N'] distDict['gamma'] = ['gamma', 'Gamma', 'G'] distDict['beta'] = ['beta', 'Beta', 'B'] return distDict
[docs]def shiftCoord(x, T, I): """ Map `x` in interval `T` to interval `I`. Use an affine transformation to shift points :math:`x` from the interval :math:`T = [t_0, t_1]` to the interval :math:`I = [a,b]`. Parameters ---------- x : array_like Points in interval :math:`T`. T : array_like Original interval. I : array_like Target interval. Returns ------- array_like Shifted values for `x`. """ return ((I[1]-I[0])*x + I[0]*T[1] - I[1]*T[0]) / (T[1]-T[0])
[docs]def cartProd(arrayList): """ Compute the outer product of two or more arrays. Assemble an array containing all possible combinations of the elements of the input vectors :math:`v_1,\\dots,v_n`. Parameters ---------- arrayList : list of array_like List of vectors :math:`v_1,\\dots,v_n`. Returns ------- x : array_like Cartesian product array. """ dim = np.shape(arrayList)[0] x = np.hstack((np.meshgrid(*arrayList))).swapaxes(0, 1).reshape(dim, -1).T return x
[docs]def is_contained(val, domain): """ Check if a given value (vector) is contained in a domain. Checks if each component of the vector lies in the one dimensional interval of the corresponding component of the domain. Parameters ---------- val : array_like Vector to check containment in domain domain : array_like Product domain of one dimensional intervals. Returns ------- bool Bool stating if value is contained in domain. """ if val.ndim < 2: val.shape = 1, -1 if domain.ndim < 2: domain.shape = 1, -1 assert val.ndim == 2 assert domain.ndim == 2 and domain.shape[1] == 2 if np.all(domain[:, 0] <= val) and np.all(val <= domain[:, 1]): return True else: return False
[docs]def formatTime(dt): """ Converts time (seconds) to time format string. Parameters ---------- dt : float Time in seconds. Returns ------- str Formatted time string. """ dct = {} dct['d'], rem = divmod(int(dt), 86400) dct['h'], rem = divmod(int(rem), 3600) dct['min'], seconds = divmod(int(rem), 60) dct['sec'] = seconds+1 # rounding seconds up fmt = '' if dct['d'] != 0: fmt += '{d} days ' if dct['h'] != 0: fmt += '{h}h ' if dct['min'] != 0: fmt += '{min}min ' if dct['sec'] > 1: fmt += '{sec}s ' if dt < 1.0: fmt += '{:2.2g}s'.format(dt) fmt = fmt.strip() return fmt.format(**dct)
[docs]def now(sep=False): """ Get string of current machine date and time. Parameters ---------- sep : bool, default=False If true, return date and time separately. Returns ------- tuple of str or str Tuple containing date and time or single string of both concatenated. """ dt = datetime.datetime.now() today = '{:04}-{:02}-{:02} '.format(dt.year, dt.month, dt.day) now = '{:02}:{:02}:{:02}'.format(dt.hour, dt.minute, dt.second) if sep: return today, now else: return today + now
[docs]def line(indicator, message=None): """ Print a line of 80 characters by repeating indicator. An additional message can be given. Parameters ---------- indicator : string Indicator the line consists of, e.g. '-', '+' or '+-'. message : string, optional Message integrated in the line. Returns ------- text : str String of 80 characters length. """ text = '' if message is not None: mLength = len(message) lengthFromLeft = 2+1+mLength+1 text = 2*indicator text = text[:2] + ' ' + message + ' ' while len(text) < 80: text += indicator return text[:80]
[docs]def save(filename, data, path='./'): """ Wrapper for `numpy.save` that assures path directory is created if necessary and that backs old data up if existent. Parameters ---------- name : str Filename to save data to. data : array_like Data to save as .npy file. path : str, default='./' Path under which the file should be created. """ if not os.path.isdir(path): os.makedirs(path) if os.path.isfile(path+filename): shutil.copyfile(path+filename, path+filename+'.backup') np.save(path+filename, data)
[docs]def load(filename): """ Alias for numpy.load(). """ return np.load(filename)
[docs]def str2iter(string, iterType=list, dataType=int): """ Cast `str(iterable)` to `iterType` of `dataType`. Cast a string of lists, tuples, etc to the specified iterable and data type, i.e., for `iterType=tuple` and `dataType=float` cast ``str([1,2,3]) -> (1.0, 2.0, 3.0)``. Parameters ---------- string : str String representation of iterable. iterType : iterable, default=list Iterable type the string is converted to. dataType : type, default=int Data type of entries of iterable, e.g. `int` or `float`. """ return iterType([dataType(s.strip()) for s in string[1:-1].split(",")])
[docs]def batch(iterable, n=1): """ Split iterable into different batches of batchsize n. Parameters ---------- iterable : array_like Iterable to split. n : int, default=1 Batch size. Yields ------ iterable Iterable for different batches. """ l = len(iterable) for ndx in range(0, l, n): yield iterable[ndx:min(ndx + n, l)]
[docs]def wlsSamplingBound(m, c=4): """ Compute the weighted Least-Squares sampling bound. The number of samples :math:`n` is chosen such that .. math:: \\frac{n}{\\log(n)} \geq cm, where :math:`m` is the dimension of the Gramian matrix (number of PC expansion terms) and :math:`c` is an arbitrary constant. In Cohen & Migliorati 2017 the authors observed that the coice :math:`c=4` yields a well conditioned Gramian with high probability. Parameters ---------- m : int Dimension of Gramian matrix. c : float, default=4 Scaling constant. Returns ------- n : int Number of required wLS samples. """ jj = max(int(np.ceil(c*m*np.log(c*m))), 2) while True: if jj / np.log(jj) >= c*m: n = jj break jj += 1 return n
[docs]def paramDictToList(paramDict): """ Cast dictionary of pythia parameters to a list sorted by index. .. deprecated:: 2.0.0 `paramDictToList` will be removed in PyThia 3.0.0, parameters should always be given as a list not a dictionary. Parameters ---------- paramDict : dict Dictionary of parameter names and corresponding `pythia.parameter.RandomParameter`. Returns ------- paramList : list of `pythia.parameter.RandomParameter` Parameter list sorted by index of parameter. """ paramList = sorted( list(paramDict.values()), key=lambda param: param.index ) return paramList
[docs]def gelman_rubin_condition(chains): """ Compute Gelman-Rubin criterion. Implementation of the Gelman-Rubin convergence criterion for multiple parameters. A Markov chain is said to be in its convergence, if the final ration is close to one. Parameters ---------- chains : array_like, ndim=3 Array containing the Markov chains of each parameter. All chains are equal in length, the assumed shape is ``(#chains, chain length, #params)``. Returns ------- float Value computed by Gelman-Rubin criterion. """ assert chains.ndim == 3 M, N, DIM = chains.shape # chains shape is (#chains, len(chains), #params) # Mean and var of chains. chain_means = np.mean(chains, axis=1) # shape is (#chains, #params) chain_vars = np.var(chains, axis=1) # shape is (#chains, #params) # Mean and var across all chains. mean = np.mean(chain_means, axis=0).reshape(1, -1) # shape = (1, #params) var = np.var(chain_means, axis=0).reshape(1, -1) # shape = (1, #params) # Between chain variance. B = N / (M-1) * np.sum((chain_means-mean)**2, axis=0) # Within chain variance. W = 1 / M * np.sum(chain_vars, axis=0) # pooled variance V = (N-1)/N*W + (M+1)/(M*N)*B return np.sqrt(V / W)
[docs]def get_confidence_interval(samples, rate=0.95, resolution=500): """ Compute confidence intervals of samples. Compute the confidence intervals of the 1D marginals of the samples (slices). The confidence interval of a given rate is the interval around the median (not mean) of the samples containing roughly `rate` percent of the total mass. This is computed for the left and right side of the median independently. Parameters ---------- samples : array_like, ndim < 3 Array containing the (multidimensional) samples. rate : float, default=0.95 Fraction of the total mass the interval should contain. resolution : int, default=500 Number of bins used in histogramming the samples. Returns ------- conf_intervals : array_like Confidence intervals for each component. """ if samples.ndim < 2: samples.shape = -1, 1 assert samples.ndim == 2 assert 0 < rate < 1 conf_intervals = np.empty((samples.shape[1], 2)) for j, s in enumerate(samples.T): median = np.median(s) hist, bdry = np.histogram(s, bins=resolution, density=True) median_bin = np.argmax(bdry > median)-1 cumsum_left = ( np.cumsum(np.flip(hist[:median_bin+1])) - 0.5*hist[median_bin] # do not count the median bin twice ) cumsum_right = ( np.cumsum(hist[median_bin:]) - 0.5*hist[median_bin] # do not count the median bin twice ) lc = median_bin - np.argmax(cumsum_left > rate*cumsum_left[-1]) rc = np.argmax(cumsum_right > rate*cumsum_right[-1]) + median_bin conf_intervals[j] = np.array([bdry[lc], bdry[rc+1]]) return conf_intervals
[docs]def doerfler_marking(values, idx=None, threshold=0.9): """ Dörfler marking for arbitrary values. Parameters ---------- values : array_like Values for the Dörfler marking. idx : list of int, optional List of indices associated with the entries of `values`. If `None`, this is set to ``range(len(values))``. threshold : float, default=0.9 Threshold paramter for Dörfler marking. Returns ------- idx_reordered : list Reordered indices given by `idx`. Ordered from largest to smallest value. ordered_values : array_like Reordered values. Ordered from largest to smallest. marker : int Threshold marker such that ``sum(values[:marker]) > threshold * sum(values)``. """ if values.ndim < 2: values.shape = -1, 1 assert values.ndim == 2 if idx is None: idx = np.arange(values.shape[0], dtype=int) # index list of largest to smalles (absolute) coeff values i = np.flip(np.argsort(np.abs(values), axis=None), axis=0) idx_reordered = idx[i] ordered_values = values[i] marker = np.argmax( np.cumsum(ordered_values, axis=0) > threshold*np.sum(values)) # marker = np.argmax(ordered_values < 0.001*ordered_values[0]) return idx_reordered, ordered_values, marker+1
if __name__ == '__main__': pass