import numpy import json import os import scipy.interpolate #for partial function specializations import functools class model: def __init__(self): self.compartments={} self.seJ={} def add_source(self,compartmentName,formula): self.compartments[compartmentName]['source']=formula def add_compartment(self,compartmentName): self.compartments[compartmentName]={} self.compartments[compartmentName]['targets']={} self.compartments[compartmentName]['sensTargets']={} def getTimeUnit(self): try: return self.mod['timeUnit'] except KeyError: return 's' def bind(self,src,target,qName,pcName,useVolume=0): #establish a flow from source compartment to the target #get volume names srcVName=self.getVolumePar(src,useVolume) targetVName=self.getVolumePar(target,useVolume) #the source equation (where we subtract the current) #in fact, this is the diagonal element pSrc=self.couplingObject(-1,qName,pcName,srcVName) #this includes derivatives and value! self.addValueObject(src,src,pSrc) #the target equation (where we add the current) pTarget=self.couplingObject(1,qName,pcName,targetVName) #equation is for target compartment, but binding for source self.addValueObject(target,src,pTarget) def addValueObject(self,targetName,srcName,cObject): #always binds equation id and a variable targetList=self.compartments[targetName]['targets'] addValue(targetList,srcName,cObject["value"]) der=cObject["derivatives"] for d in der: targetSE=self.getSEJ_comp(d,targetName) addValue(targetSE,srcName,der[d]) def couplingObject(self,sign, qParName, pcParName, vParName): qPar=self.get(qParName) pcPar=self.get(pcParName) vPar=self.get(vParName) q=qPar['value'] pc=pcPar['value'] v=vPar['value'] f=lambda t,q=q,pc=pc,v=v,s=sign:s*q(t)/v(t)/pc(t) DPC=pcPar['derivatives'] dfdPC=lambda t,f=f,pc=pc:-f(t)/pc(t) dPC={x:lambda t,df=dfdPC,D=DPC,x=x: df(t)*D[x](t) for x in DPC} DQ=qPar['derivatives'] dfdQ=lambda t,f=f,q=q: f(t)/q(t) dQ={x:lambda t,df=dfdQ,D=DQ,x=x: df(t)*D[x](t) for x in DQ} DV=vPar['derivatives'] dfdV=lambda t,f=f,v=v: -f(t)/v(t) dV={x:lambda t,df=dfdV,D=DV,x=x: df(t)*D[x](t) for x in DV} #derivatives is the combination of the above return derivedObject(f,[DPC,DQ,DV]) def getVolumePar(self,compartment,useVolume=1): #returnis volume name, if found and useVolume is directed, #or a standard parameter one if not useVolume: return "one" try: return self.mod["volumes"][compartment] #parV=self.mod["parameters"][parVName] except KeyError: pass return "one" def build(self): comps=self.compartments self.n=len(comps) self.fu=[lambda t:0]*self.n self.lut={c:i for (i,c) in zip(range(self.n),comps.keys())} self.fM={} for c in comps: comp=comps[c] if 'source' in comp: self.fu[self.lut[c]]=parseFunction(comp['source']) for t in comp['targets']: try: self.fM[self.lut[c]][self.lut[t]]=\ sumFunctionArray(comp['targets'][t]) except (KeyError,TypeError): self.fM[self.lut[c]]={} self.fM[self.lut[c]][self.lut[t]]=\ sumFunctionArray(comp['targets'][t]) #build SE part self.buildSE() def buildSE(self): #check which parameterst to include parList=[] for parName in self.seJ: #print(par) par=self.mod['parameters'][parName] usePar=calculateDerivative(par) #print('[{}]: {}'.format(usePar,par)) if not usePar: continue parList.append(parName) #print(parList) self.m=len(parList) self.lutSE={c:i for (i,c) in zip(range(self.m),parList)} self.qSS={} for parName in parList: sources=self.seJ[parName] for compartment in sources: targets=sources[compartment] for t in targets: k=self.lutSE[parName] i=self.lut[compartment] j=self.lut[t] #print('[{} {} {}] {}'.format(parName,compartment,t,targets[t])) ft=sumFunctionArray(targets[t]) try: self.qSS[k][i][j]=ft except (KeyError,TypeError): try: self.qSS[k][i]={} self.qSS[k][i][j]=ft except (KeyError,TypeError): self.qSS[k]={} self.qSS[k][i]={} self.qSS[k][i][j]=ft def inspect(self): comps=self.compartments pars=self.mod['parameters'] tu=self.getTimeUnit() print('Time unit: {}'.format(tu)) print('Compartments') for c in comps: print('{}/{}:'.format(c,self.lut[c])) comp=comps[c] if 'source' in comp: print('\tsource\n\t\t{}'.format(comp['source'])) print('\ttargets') for t in comp['targets']: print('\t\t{}[{},{}]: {}'.format(t,self.lut[c],self.lut[t],\ comp['targets'][t])) print('Flows') for f in self.flows: fName=self.flows[f] fParName=self.mod['flows'][fName] fPar=pars[fParName] print('\t{}[{}]:{} [{}]'.format(f,fName,fParName,self.get(fParName))) print('Volumes') for v in self.mod['volumes']: vParName=self.mod['volumes'][v] vPar=pars[vParName] print('\t{}:{} [{}]'.format(v,vParName,self.get(vParName))) print('Partition coefficients') for pc in self.mod['partitionCoefficients']: pcParName=self.mod['partitionCoefficients'][pc] pcPar=pars[pcParName] print('\t{}:{} [{}]'.format(pc,pcParName,self.get(pcParName))) print('SE parameters') for p in self.seJ: print(p) sources=self.seJ[p] for compartment in sources: targets=sources[compartment] for t in targets: print('\t SE bind {}/{}:{}'.format(compartment,t,targets[t])) def parse(self,file): with open(file,'r') as f: self.mod=json.load(f) for m in self.mod['compartments']: self.add_compartment(m) self.add_default_parameters() #standard parameters such as one,zero etc. for s in self.mod['sources']: #src=mod['sources'][s] self.add_source(s,self.mod['sources'][s]) self.flows={} pars=self.mod['parameters'] for f in self.mod['flows']: #skip comments if f.find(':')<0: continue comps=f.split(':') c0=splitVector(comps[0]) c1=splitVector(comps[1]) for x in c0: for y in c1: pairName='{}:{}'.format(x,y) self.flows[pairName]=f for b in self.mod['bindings']['diffusion']: #whether to scale transfer constants to organ volume #default is true, but changing here will assume no scaling useVolume=1 comps=b.split('->') try: pcParName=self.mod['partitionCoefficients'][b] except KeyError: pcParName="one" kParName=self.mod['bindings']['diffusion'][b] #operate with names to allow for value/function/derived infrastructure self.bind(comps[0],comps[1],kParName,pcParName,useVolume) for q in self.mod['bindings']['flow']: comps=q.split('->') srcs=splitVector(comps[0]) tgts=splitVector(comps[1]) for cs in srcs: for ct in tgts: #get partition coefficient try: pcParName=self.mod['partitionCoefficients'][cs] except KeyError: pcParName="one" #get flow (direction could be reversed) try: qName=self.flows['{}:{}'.format(cs,ct)] except KeyError: qName=self.flows['{}:{}'.format(ct,cs)] flowParName=self.mod['flows'][qName] #flowPar=pars[flowParName] self.bind(cs,ct,flowParName,pcParName,1) self.build() def add_default_parameters(self): self.mod['parameters']['one']={'value':1} self.mod['parameters']['zero']={'value':0} def M(self,t): Mb=numpy.zeros((self.n,self.n)) for i in self.fM: for j in self.fM[i]: Mb[i,j]=self.fM[i][j](t) #create an array and fill it with outputs of function at t return Mb def u(self,t): ub=[f(t) for f in self.fu] return numpy.array(ub) def fSS(self,t): fSS=numpy.zeros((self.m,self.n,self.n)) for k in self.qSS: for i in self.qSS[k]: for j in self.qSS[k][i]: #print('[{},{},{}] {}'.format(k,i,j,self.qSS[k][i][j])) fSS[k,i,j]=(self.qSS[k][i][j])(t) return fSS def fSY(self,y,t): #M number of sensitivity parameters #N number of equations #fSS is MxNxN #assume a tabulated solution y(t) at t spaced intervals qS=self.fSS(t).dot(y) #qS is MxN #but NxM is expected, so do a transpose #for simultaneous calculation, a Nx(M+1) matrix is expected tS=numpy.zeros((self.n,self.m+1)) #columns from 2..M+1 are the partial derivatives tS[:,1:]=numpy.transpose(qS) #first column is the original function tS[:,0]=self.u(t) return tS def fS(self,t): #M number of sensitivity parameters #N number of equations #fSS is MxNxN #assume a tabulated solution y(t) at t spaced intervals qS=self.fSS(t).dot(self.getY(t)) return numpy.transpose(qS) def getSEJ(self,parName): #find the sensitivity (SE) derivative of Jacobi with #respect to parameter try: return self.seJ[parName] except KeyError: self.seJ[parName]={} return self.seJ[parName] def getSEJ_comp(self,parName,compartmentName): #find equation dictating concentration in compartmentName #for jacobi-parameter derivative seJ=self.getSEJ(parName) try: return seJ[compartmentName] except KeyError: seJ[compartmentName]={} return seJ[compartmentName] def setY(self,t,y): self.tck=[None]*self.n for i in range(self.n): self.tck[i] = scipy.interpolate.splrep(t, y[:,i], s=0) def getY(self,t): fY=numpy.zeros(self.n) for i in range(self.n): fY[i]=scipy.interpolate.splev(t, self.tck[i], der=0) return fY def calculateUncertainty(self,sol,se): s2out=numpy.zeros(sol.shape) pars=self.mod['parameters'] for parName in pars: par=pars[parName] if not calculateDerivative(par): continue v=par["value"] if par['dist']=='lognormal': #this is sigma^2_lnx sln2=numpy.log(par["cv"]*par["cv"]+1) #have to multiplied by value to get the derivative #with respect to lnx s2=sln2*v*v else: #for Gaussian, cv is sigma/value; get sigma by value multiplication s2=par["cv"]*par["cv"]*v*v j=self.lutSE[parName] fse=se[:,:,j] print('Calculating for {}/{}:{}'.format(parName,j,fse.shape)) #se2 is nt x n se2=numpy.multiply(fse,fse) s2out+=se2*s2 return numpy.sqrt(s2out) def get(self,parName): par=self.mod['parameters'][parName] par['name']=parName if "value" in par: return self.getValue(par) if "function" in par: return self.getFunction(par) if "derived" in par: return self.getDerived(par) def getValue(self,par): v=par["value"] parName=par['name'] #convert to seconds try: parUnits=par['unit'].split('/') except (KeyError,IndexError): #no unit given return valueObject(v,parName) timeUnit=self.getTimeUnit() try: if parUnits[1]==timeUnit: return valueObject(v,parName) except IndexError: #no / in unit name return valueObject(v,parName) if parUnits[1]=='min' and timeUnit=='s': return valueObject(lambda t,v=v: v(t)/60,parName) if parUnits[1]=='s' and timeUnit=='min': return valueObject(lambda t,v=v: 60*v(t),parName) #no idea what to do return valueObject(v,parName) def getFunction(self,par): fName=par['function'] #print('[{}]: getFunction({})'.format(par['name'],par['function'])) df=self.mod['functions'][fName] if df['type']=='linearGrowth': skip=['type'] par1={x:self.get(df[x]) for x in df if x not in skip} #print(par1) return linearGrowth(par1) def getDerived(self,par): dName=par['derived'] d=self.mod['derivedParameters'][dName] #print('Derived [{}]: type {}'.format(dName,d['type'])) if d['type']=='product': #print('{}*{}'.format(d['a'],d['b'])) pA=self.get(d['a']) a=pA['value'] DA=pA['derivatives'] pB=self.get(d['b']) b=pB['value'] DB=pB['derivatives'] #even more generic -> df/dp=[df/dA*dA/dp+df/dB*dB/dp] f=lambda t,a=a,b=b,:a(t)*b(t) dfdA=lambda t,b=b: b(t) dfdB=lambda t,a=a: a(t) dA={x:lambda t,df=dfdA,D=DA,x=x: df(t)*D[x](t) for x in DA} dB={x:lambda t,df=dfdB,D=DB,x=x: df(t)*D[x](t) for x in DB} return derivedObject(f,[dA,dB]) if d['type']=='power': #print('{}^{}'.format(d['a'],d['n'])) pA=self.get(d['a']) a=pA['value'] DA=pA['derivatives'] pN=self.get(d['n']) n=pN['value'] DN=pN['derivatives'] f=lambda t,a=a,n=n:numpy.power(a(t),n(t)) dfdA=lambda t,n=n,f=f,a=a:n(t)*f(t)/a(t) dfdN=lambda t,a=a,f=f:numpy.log(a(t))*f(t) dA={x:lambda t,df=dfdA,D=DA,x=x: df(t)*D[x](t) for x in DA} dN={x:lambda t,df=dfdN,D=DN,x=x: df(t)*D[x](t) for x in DN} return derivedObject(f,[dA,dN]) if d['type']=='ratio': #print('{}/{}'.format(d['a'],d['b'])) pA=self.get(d['a']) a=pA['value'] DA=pA['derivatives'] pB=self.get(d['b']) b=pB['value'] DB=pB['derivatives'] #even more generic -> df/dp=[df/dA*dA/dp+df/dB*dB/dp] f=lambda t,a=a,b=b,:a(t)/b(t) dfdA=lambda t,f=f,a=a: f(t)/a(t) dfdB=lambda t,f=f,b=b: -f(t)/b(t) dA={x:lambda t,df=dfdA,D=DA,x=x: df(t)*D[x](t) for x in DA} dB={x:lambda t,df=dfdB,D=DB,x=x: df(t)*D[x](t) for x in DB} return derivedObject(f,[dA,dB]) if d['type']=='sum': #print('{}+{}'.format(d['a'],d['b'])) pA=self.get(d['a']) a=pA['value'] DA=pA['derivatives'] pB=self.get(d['b']) b=pB['value'] DB=pB['derivatives'] #even more generic -> df/dp=[df/dA*dA/dp+df/dB*dB/dp] f=lambda t,a=a,b=b,:a(t)+b(t) dfdA=lambda t: 1 dfdB=lambda t: 1 dA={x:lambda t,df=dfdA,D=DA,x=x: df(t)*D[x](t) for x in DA} dB={x:lambda t,df=dfdB,D=DB,x=x: df(t)*D[x](t) for x in DB} return derivedObject(f,[dA,dB]) def calculateDerivative(par): #add derivatives if dist(short for distribution) is specified return "dist" in par def sumValues(dArray,x): s=0 for a in dArray: try: s+=a[x] except KeyError: continue return s def sumFunctions(dArray,x): #print('sumFunctions for {}'.format(x)) fs=[] for a in dArray: #print('\tChecking {}'.format(a)) try: #check if a[x] is a function with a dummy parameter v=a[x](1e7) fs.append(a[x]) #print('\tAdding [{}]'.format(v)) except KeyError: #print('\t{} not in table'.format(x)) continue except TypeError: #print('\tConstructing lambda for {}'.format(x)) fs.append(lambda t,a=a,x=x:a[x]) return sumFunctionArray(fs) def sumFunctionArray(fs): return lambda t,fs=fs: sum([f(t) for f in fs]) def valueObject(v,parName): #convert everything to functions d0={parName:lambda t:1} try: v(3) return {'value':v,'derivatives':d0} except TypeError: pass return {'value':lambda t,v=v:v,'derivatives':d0} def derivedObject(v,dArray): try: v(3) o={'value':v} except TypeError: o={'value':lambda t,v=v:v} allKeys=[] for x in dArray: allKeys.extend(x.keys()) allKeys=list(set(allKeys)) o['derivatives']={x:sumFunctions(dArray,x) for x in allKeys} return o def splitVector(v): if v.find('(')<0: return [v] return v[1:-1].split(',') def parseFunction(formula): if formula['name']=='exponential': c0=formula['constant'] k=formula['k'] return lambda t,c=c0,k=k:c*numpy.exp(k*t) if formula['name']=='constant': c0=formula['value'] return lambda t,c0=c0:c0 if formula['name']=='Heavyside': t0=formula['limit'] v=formula['value'] return lambda t,v=v,t0=t0:v if t