123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588 |
- '''
- Base classes for implementing an orthonormal basis. These classes are intended
- to enable a convenient and uniform implementation of any number of orthonormal
- bases in terms of their recursion relations. At present, the only basis
- actually implemented is the orthonormal exponentials. However, this structure
- will make it convenient to implement, e.g. Legendre polynomials for an angular
- analysis, or any other othonormal function set that can be defined in terms
- of its recursion relations.
- This module consists of four classes:
- ParametricObject: A base class that implements named parameters that can be
- marked as fittable or fixed. Used as a base class for many
- FD classes.
- DecompFactory: Store all parameters for the basis and provide methods to
- create various basis objects with the correct parameters.
- Also provides several convenience methods for performing
- decompositions and construction covariance matrices.
- Basis: An abstract iterator object used by child classes to
- implement three-term recursion relations.
- Transform: An abstract base class to implement operations to transform
- between different sets of basis hyperparameters.
- '''
- import numpy as np
- import numexpr as ne
- import Tools.CacheMgr as Cache
- from Tools.PrintMgr import *
- from scipy.sparse.linalg import expm_multiply
- from scipy.linalg import solve
- from multiprocessing import Pool
- from abc import abstractmethod
- from os import environ
- class ParametricObject(object):
- '''
- A base class that implements a collection of named parameters.
- ParametricObject is _NOT_ intended for direct usage, rather it provides
- shared methods for its subclasses. It uses the __getitem__ / __setitem__
- interface, so access properties as if it were a dict.
-
- The valid parameters are defined in two class variables:
- _param : a list of all named parameters
- _fitparam : a list of which parameters are fittable
-
- These must be overridden by the subclass. When instantiating the subclass,
- initial values must be specified by name for ALL parameters in _param, e.g.
- class MyClass(ParametricObject):
- _param = ( "Prop1", "Prop2", "ThirdProp" )
- ...
- must be initialized like
- Instance = MyClass(Prop1="val1", Prop2="val2", ThirdProp="val3")
- , or a KeyError will be thrown.
- '''
- _param = ( )
- _fitparam = ( )
-
- # Update the parameters with the values from the dict 'ind'.
- def update(self, ind): return self.ParamVal.update(ind)
- # Return all parameter / value pairs.
- def items (self): return self.ParamVal.items()
- # Get parameter 'k' by name.
- def __getitem__ (self, k): return self.ParamVal[k]
- # Set parameter 'k' by name
- def __setitem__ (self, k, v): self.ParamVal[k] = v
- # Check if paramameter 'k' exists.
- def __contains__(self, k): return k in self.ParamVal
- # Get the argument dict, updated by the contents of 'kw'
- def _arg(self, kw):
- d = self.ParamVal.copy()
- d.update(kw)
- return d
- # kwargs must include ALL parameters listed in self._param.
- def __init__(self, **kwargs):
- self.ParamVal = { p : kwargs[p] for p in self._param }
- # Pretty-formatted string representation.
- def __str__(self):
- s = [ str_nl % "className" + type(self).__name__ ]
- s += [ str_nl % k + str(self[k]) for k in self._param if k not in self._fitparam ]
- s += [ str_el % k + "%.3f" % self[k] for k in self._fitparam ]
- return '\n'.join(s)
- # Format as a list of floats joined by '_'.
- def __format__(self, fmt):
- # The str(float()) construction ensures that numpy
- # singletons are printed nicely.
- s = []
- s += [ str(float(self[k])) for k in self._fitparam ]
- s += [ str(self[k]) for k in self._param if k not in self._fitparam ]
- return "_".join(s)
- class DecompFactory(ParametricObject):
- '''
- DecompFactory is a class to store the hyperparameters for an orthonormal
- function set and conveniently create various decomposition instances
- with those parameters. It is an abstract class - it should be extended
- by a concrete subclass that defines the correct hyperparameters and methods
- for the orthonormal function set.
- Three parameters are defined in _param:
- Nbasis : The total number of basis functions to consider
- Nxfrm : The number of basis functions to use for fast transformations
- during the hyperparamer scan.
- Ncheck : The maximum number of basis functions to consider for
- inclusion in the background-only set.
- Subclasses should extend _param with any additional parameters that they
- require, and set _fitparam as appropriate.
- Furthermore, four abstract factory methods must be implemented by the subclass:
- Fn (self, x, w, **kw)
- MxForm (self, **kw)
- Pri (self, **kw)
- Xfrm (self, **kw)
- See the respective docstrings of each for more detail.
- For an example implementation, see Tools.OrthExp.ExpDecompFactory.
- DecompFactory additionally provides some convenience methods for decomposing datasets
- and calculating covariance matrices.
- '''
- _param = ("Nbasis", "Nxfrm", "Ncheck")
- @abstractmethod
- def Fn (self, x, w, **kw):
- '''
- Return a subclass of Basis that implements an orthonormal basis function.
- 'x' is an ndarray with the x-values at which to evaluate the function.
- 'w' is an ndarray with the weights corresponding to each 'x'.
- These must have the same shape; if the weights are not used, just pass
- np.ones_like(x) as a weight.
-
- The kwargs can be used to override the hyperparameter values that are
- set in this DecompFactory.
- '''
- return
- @abstractmethod
- def MxForm (self, *mom, **kw):
- '''
- Return a subclass of Basis that takes in a list of moment vectors and
- returns the corresponding moment matrices row-by-row. Each positional
- argument is a single moment-vector.
- The kwargs can be used to override the hyperparameter values that are
- set in this DecompFactory.
- '''
- return
- @abstractmethod
- def Pri (self, **kw):
- '''
- Return the moments of the prior distribution moment-by-moment.
- The kwargs can be used to override the hyperparameter values that are
- set in this DecompFactory.
- '''
- return
- @abstractmethod
- def Xfrm (self, **kw):
- '''
- Return a transformation object. The transformation object takes a
- moment vector with the current choice of hyperparameter values and
- transforms it into a vector with an alternate choice of hyperparameter
- values. See Tools.Base.Transform for a more detailed description
- of the transformation object.
- The kwargs can be used to override the hyperparameter values that are
- set in this DecompFactory.
- '''
- return
- def MomMx (self, Mom, i, j, **kwargs):
- '''
- Return the matrix form of Mom using the fast matrix cache stored in
- this DecompFactory object. This method is only able to use 'Nxfrm'
- moments. It is intended to be used only during the hyperparameter scan.
- The parameters are:
- Mom: The moment vector to convert to matrix form.
- [i, j) : The range of moments to include. 'i' is included; 'j' is not
- Optional kwargs are:
- N: size of the matrix to return (N by N)
- out: ndarray to store the output. Otherwise, return a new allocation.
- '''
- out = kwargs.get("out", None)
- N = kwargs.get("N", self["Nxfrm"])
- j = min(j, self.TDot.shape[-1])
- return np.dot( self.TDot[:N,:N,i:j], Mom[i:j], out=out)
- def MxTensor(self, *Mom):
- '''
- Return the matrix form of each moment vector listed in the positional
- arguments. This uses the full MxForm machinery, so it can work on
- moment vectors of any length. The size of the returned covariance matrices
- is N by N, where N is the length of the first positional argument.
- '''
- Nb = Mom[0].size
- Ct = np.zeros((Nb, Nb, len(Mom)))
- for r in self.MxForm(*Mom, Nbasis=Nb):
- Ct[r.N] += r.Values().T
- return Ct
- def CovMatrix(self, Mom):
- '''
- Return the covariance matrix association with the moment vector Mom.
- This method works on moment vectors of any length. The returned
- covariance matrix is N by N, where N is the length of Mom.
- '''
- Nb = Mom.size
- Ct = -np.outer(Mom[:Nb], Mom[:Nb])
- Ct[0,0] = 1
- for r in self.MxForm(Mom, Nbasis=Nb):
- Ct[r.N] += r.Values()[0]
- return Ct
- def OpMatrix(self, N, **kwargs):
- '''
- Create and return the fast matrix cache. This calculates the N by N
- matrix form of each basis function, and returns the result in a form
- such that
- Mx.dot ( MomVec )
- results in the matrix form of MomVec. Option kwargs:
- M : the number of basis elements for which to calculate the N by N
- matrix form.
- Note that generally this function should not need to be used directly.
- One of CovMatrix, MomMx, or MxTensor probably do what you want.
- '''
- M = kwargs.get("M", N)
- v = np.zeros((N,N,M))
- F = self.MxForm( *[x for x in np.eye(N)[:M]], Nbasis=N )
- for x in F:
- v[x.N] = x.Values().T
- return v
- def Decompose(self, x, w, cksize=2**20, **kw):
- '''
- Given a dataset 'x' of values with weights 'w', compute the moments of
- that dataset. Both 'x' and 'w' should be one-dimensional ndarrays
- with the same length.
- The kwargs can be used to override the hyperparameter values that are
- set in this DecompFactory.
- This function recomputes the decomposition from scratch each time it is
- called. Normally, you want to use CachedDecompose instead - this will
- store the results on disk and load the stored results if that can
- avoid recomputing a decomposition.
- '''
- Nb = kw.pop("Nbasis", 0)
- Fn = self.Fn(x[:cksize+1], w[:cksize+1], Nbasis=Nb if Nb > 0 else self["Nbasis"], **kw)
- Mom = np.zeros( (Fn["Nbasis"],) )
- for i in range(0, x.size, cksize):
- pdot()
- Fn.Reinit( x[i:i+cksize+1 ], w[i:i+cksize+1] )
- for D in Fn: Mom[D.N] += D.Moment()
- return Mom
- @Cache.Element("{self.CacheDir}", "Decompositions", "{self}", "{2:s}-{Nbasis}.npy")
- def CachedDecompose(self, x, w, name, cksize=2**20, Nbasis=0, **kwargs):
- '''
- Given a dataset 'x' of values with weights 'w', compute the moments of
- that dataset. Both 'x' and 'w' should be one-dimensional ndarrays
- with the same length.
- The kwargs can be used to override the hyperparameter values that are
- set in this DecompFactory.
- This function stores the results on disk, and loads the stored result
- if the same decomposition is requested a second time. This avoids
- unnecessary re-decompositions, particularly if the same code is run
- repeatedly.
- '''
- return self.Decompose(x, w, cksize, Nbasis=Nbasis, **kwargs)
- def __init__(self, **kwargs):
- '''
- All parameters listed in _params are mandatory. In addition, optional
- arguments are:
- Nthread: the number of threads to use in numexpr computations.
- CacheDir: the directory in which to store cahced files.
- '''
- self.Nthread = kwargs.get('Nthread', 1)
- self.CacheDir = kwargs.get('CacheDir', "tmp")
- self.FDDir = environ.get('FD_DIR', ".")
- ParametricObject.__init__(self, **kwargs)
- ne.set_num_threads(self.Nthread)
- self.TDot = self.OpMatrix( self["Nxfrm"] )
- self['Factory'] = self
- class Basis(ParametricObject):
- '''
- A base class for orthonormal bases in terms of their three-term recursion
- relations. It is not intended for direct use, but instead provides several
- common methods for its subclasses.
- 'Basis' exposes an interator interface. Subclasses implement the recursion
- relations, and the Basis object applies them to step through the resulting
- values one-by-one. Three-term recursion relations take the general form:
- V_(n+1) = a*x*V(n) + b*V(n) + c*V(n-1)
- Because each term depends on the _two_ preceding terms, two base values
- are required. These are implemented in the 'Base0' and 'Base1' methods.
- 'x' is a raising operator, implemented in 'Raise'. The recursion relation
- itself is implemented in 'Recur'.
- An optional constant of integration can be included by overriding 'Xfrm'.
- If used, this must explicitly be incorporated in the subclass's
- implementation.
- See Tools.OrthExp for several examples of different subclasses that utilize
- the 'Basis' object.
- '''
- _param = ("Nbasis", )
- # User-facing functions.
- #def Values(self): return self.t[ self.N % 2 ] * np.sqrt(self.NormSq(self.N))
- # Return a zero ndarray of the appropriate shape and type.
- def Zeros (self, shape=()): return np.zeros(shape + (1,))
- # The zero'th value of the basis.
- @abstractmethod
- def Base0 (self, out): return None
- # The first value of the basis.
- @abstractmethod
- def Base1 (self, out): return None
- # The raising operator. This is computed on initialization and stored
- # in self['rz'].
- @abstractmethod
- def Raise (self, out): return None
- # The recursion relation. This should set 'out' to V(N+1)
- # given En=V(N) and Ep=V(N-1).
- @abstractmethod
- def Recur (self, N, En, Ep, out): return
- # (Optional) constant of integration. This is computed on initialization
- # and stored in self['xf'].
- def Xfrm (self, out): return self.Zeros()
- ##
- # Implementation of iterator interface
- ##
- def __iter__(self, **kwargs):
- self.N = -1
- for k, v in kwargs.items():
- self[k] = v
- if 'xf' in self.ParamVal and 'x' in self.ParamVal and self['x'].size == self['xf'].size:
- self.Xfrm (self['xf'])
- self.Raise (self['rz'])
- else:
- self['xf'] = self.Xfrm(None)
- self['rz'] = self.Raise(None)
- self['t'] = self.Zeros((2,))
- self.Base0 (self['t'][0])
- self.Base1 (self['t'][1])
-
- return self
- def next(self):
- next = self['t'][ (self.N + 1) % 2 ]
- this = self['t'][ (self.N ) % 2 ]
- prev = self['t'][ (self.N - 1) % 2 ]
- if self.N < 0: self.Base0(next)
- elif self.N == 0: self.Base1(next)
- elif self.N < self['Nbasis']: self.Recur(self.N, this, prev, next)
- if self.N == self['Nbasis'] - 1: raise StopIteration
- self.N += 1
- return self
- def __str__(self):
- s = [ str_nl % "className" + type(self).__name__ ]
- s += [ str_nl % k + str(self[k]) for k in self._param ]
- return '\n'.join(s)
- # These go with the Transform object. Must be globals for Multiprocessing
- gK = {}
- def gEle(n, m): return gK['obj'].Ele(n, m, **gK)
- class Transform(ParametricObject):
- '''
- A base class for implementing transformations between different choices of
- hyperparameters. This assumes that a transformation on each parameter can
- be written in the following way:
- x' = exp( a_p * T_p ) . x
- where
- x_p is the initial decomposition vector
- T_p is the infinitesimal transformation matrix for parameter p
- a_p is the amount to transform parameter p
- Not all transformations can be written in this way! However, those than can
- be are convenient and tractable for numerical computation. Since T_p is
- presumed to be a constant, independent of the choice of hyperparameters,
- it can be computed exactly once and then cached on disk. Since this is
- typically an expensive computation, this is ideal.
- The 'Transform' object provides some framework to calculate T for each
- hyperparameter and cache the result. It also supplies methods to use
- these transformation matrices on decomposition vectors to transform them
- from one set of hyperparameters to another.
- The _param class member must specify each of the hyperparameters (by name)
- for which a transformation matrix will be computed. Each specified
- hyperparameter must have a corresponding method of the same name that
- return T_p. An example implementation would look something like:
- _param = ("HyperA", "HyperB" )
- def HyperA(self):
- ...
- def HyperB(self):
- ...
- In most cases, the matrix is computed more easily in some non-orthonormal
- basis and can then be transformed into the orthonormal basis. In this
- case,
- T_p = O . U_p . D
- where O is the orthonormal functions expressed in the non-orthonormal
- basis and D is the derivatives of the orthonormal functions expressed in
- the non-orthonormal basis.
- The @OrthogonalizeHankel decorator is provided to automate this in the
- case that U_p is a Hankel matrix (i.e. all antidiagonals are constant). To
- make this work, the subclass must implement two functions
- def CoeffOrth(self, n): return the n by n matrix O[n,m] / O[n,m-1]
- def CoeffDeriv(self, n): return the n by n matrix D[n,m] / D[m,m-1]
- and implement the the transformations like:
- @Base.Transform.OrthogonalizeHankel
- def HyperA(self, n): return the value of U_p[n, n]
- CoeffOrth and CoeffDeriv return _ratios_ of the columns in O and D, as this
- generally has a much simplier expression than the columns directly. This
- in turns greatly increases evaluation speed when the underlying type is
- Fraction.
- Then OrthogonalizeHankel will handle the matrix multiplications using a
- multiprocessing pool. This is overkill if the O, D and U_p are simple
- types like an int or a float. However, it is sometimes necessary for
- numerical reasons to use slow but accurate types like Fraction, and when
- the matrices are large this can become very slow. In this case, the
- multiprocessing provides a substantial performance improvement.
- The results of OrthogonalizeHankel are cached on disk, so in any case these
- computations need only be performed once. This cache is located in
- <FD_DIR>/data
-
- See Tools.OrthExp.ExpDecompTransform for an example implementation.
- '''
- _param = ( )
- def __call__(self, *vec, **kwargs):
- '''
- Transform a list of moments vectors from the current hyperparameters
- into an alternate choice of hyperparameters. The current parameters
- are specified by the values stored in this object, and the new
- parameters are specified in kwargs. Each positional argument
- should be an ndarray moment vector.
- '''
- for p in self._param:
- if p not in kwargs:
- kwargs[p] = self.ParamVal[p]
- inv = kwargs.get("inv", False)
- ini = kwargs if inv else self.ParamVal
- fin = self.ParamVal if inv else kwargs
- ret = np.stack(vec, axis=1)[:self["Nxfrm"]]
- arg = self.XfPar(fin, ini)
- mx = sum( arg[p] * self.KMx[p].T for p in self._param )
- ret = expm_multiply( mx, ret )
- return tuple(ret.T)
- @Cache.AtomicElement("{self.Factory.FDDir}", "data", "ele-cache-{name:s}", "{0:d}-{1:d}.json")
- def Ele(self, n, m, **kwargs):
- '''
- Compute element (n, m) of self.O . H . self.D where H is a Hankel matrix.
- H is specified by passing it's diagonal as the kwarg 'h'.
- '''
- h = kwargs.get("h")
- v = np.outer(self.O[n][::-1], self.D[m])
- s = [ np.trace(v, offset=k) for k in range(-v.shape[0]+1, v.shape[1]) ]
- return np.dot(h[:len(s)], s) * self.Norm(n, m)
- @staticmethod
- def OrthogonalizeHankel(func):
- '''
- Decorator to create N x N infinitesimal transformation matrices from a
- wrapped Hankel matrix. This function takes two parameters:
- name : the name of the corresponding hyperparameter. Used to cache
- the matrix in a unique file.
- N : the size of the tranformation matrix.
- The wrapped function must take a single argument, like this:
- @Base.Transform.OrthogonalizeHankel
- def TheWrappedFunction(self, n):
- TheWrappedFunction should take a number n and return the value of the
- n'th diagonal element (i.e. H[n, n]).
- See Tools.OrthExp.ExpDecompTransform for an example.
- '''
- @Cache.Element("{self.Factory.FDDir}", "data", "xfrm-cache-{0:s}-{1:d}.npy")
- def wrap(self, name, N):
- pini(name + " xfrm moment")
- global gK
- gK = {
- 'h' : [ pdot(func(self, k)) for k in range(2*N-1) ],
- 'name': name,
- 'obj' : self
- }
- pend()
- pini(name + " xfrm matrix", interval=N)
- p = Pool( self.Factory.Nthread )
- r = [ p.apply_async(gEle, (n, m), callback=pdot) for n in range(N) for m in range(N) ]
- p.close()
- r = np.array([ x.get() for x in r]).reshape((N, N))
- pend()
- return r
- return wrap
- # Calculate infinitesimal transformations
- def __init__(self, **kwargs):
- '''
- All parameters listed in _params are mandatory. Additional mandatory
- parameters are:
- Nxfrm: The size of the transformation matrices (Nxfrm x Nxfrm)
- Factory: The parent Factory object.
- '''
- ParametricObject.__init__(self, **kwargs)
- self["Nxfrm"] = kwargs["Nxfrm"]
- self.Factory = kwargs["Factory"]
- self.O = [ self.CoeffOrth (n) for n in range(self["Nxfrm"]) ]
- self.D = [ self.CoeffDeriv(n) for n in range(self["Nxfrm"]) ]
- self.KMx = { n : getattr(self, n)(n, self["Nxfrm"]) for n in self._param }
|