||
- import numpy
- import json
- import os
- import scipy.interpolate
- #for partial function specializations
- import functools
- class model:
- def __init__(self):
- self.compartments={}
- self.seJ={}
-
- def add_source(self,compartmentName,formula):
- self.compartments[compartmentName]['source']=formula
- def add_compartment(self,compartmentName):
- self.compartments[compartmentName]={}
- self.compartments[compartmentName]['targets']={}
- self.compartments[compartmentName]['sensTargets']={}
- def getTimeUnit(self):
- try:
- return self.mod['timeUnit']
- except KeyError:
- return 's'
- def bind(self,src,target,qName,pcName,useVolume=0):
-
- #establish a flow from source compartment to the target
- #get volume names
- srcVName=self.getVolumePar(src,useVolume)
- targetVName=self.getVolumePar(target,useVolume)
-
- #the source equation (where we subtract the current)
- #in fact, this is the diagonal element
- pSrc=self.couplingObject(-1,qName,pcName,srcVName)
- #this includes derivatives and value!
- self.addValueObject(src,src,pSrc)
-
- #the target equation (where we add the current)
- pTarget=self.couplingObject(1,qName,pcName,targetVName)
- #equation is for target compartment, but binding for source
- self.addValueObject(target,src,pTarget)
-
- def addValueObject(self,targetName,srcName,cObject):
- #always binds equation id and a variable
- targetList=self.compartments[targetName]['targets']
- addValue(targetList,srcName,cObject["value"])
- der=cObject["derivatives"]
- for d in der:
- targetSE=self.getSEJ_comp(d,targetName)
- addValue(targetSE,srcName,der[d])
-
- def couplingObject(self,sign, qParName, pcParName, vParName):
-
- qPar=self.get(qParName)
- pcPar=self.get(pcParName)
- vPar=self.get(vParName)
- q=qPar['value']
- pc=pcPar['value']
- v=vPar['value']
- f=lambda t,q=q,pc=pc,v=v,s=sign:s*q(t)/v(t)/pc(t)
- DPC=pcPar['derivatives']
- dfdPC=lambda t,f=f,pc=pc:-f(t)/pc(t)
- dPC={x:lambda t,df=dfdPC,D=DPC,x=x: df(t)*D[x](t) for x in DPC}
- DQ=qPar['derivatives']
- dfdQ=lambda t,f=f,q=q: f(t)/q(t)
- dQ={x:lambda t,df=dfdQ,D=DQ,x=x: df(t)*D[x](t) for x in DQ}
- DV=vPar['derivatives']
- dfdV=lambda t,f=f,v=v: -f(t)/v(t)
- dV={x:lambda t,df=dfdV,D=DV,x=x: df(t)*D[x](t) for x in DV}
- #derivatives is the combination of the above
-
- return derivedObject(f,[DPC,DQ,DV])
- def getVolumePar(self,compartment,useVolume=1):
- #returnis volume name, if found and useVolume is directed,
- #or a standard parameter one
- if not useVolume:
- return "one"
- try:
- return self.mod["volumes"][compartment]
- #parV=self.mod["parameters"][parVName]
- except KeyError:
- pass
- return "one"
- def build(self):
- comps=self.compartments
- self.n=len(comps)
- self.fu=[lambda t:0]*self.n
- self.lut={c:i for (i,c) in zip(range(self.n),comps.keys())}
- self.fM={}
- for c in comps:
- comp=comps[c]
- if 'source' in comp:
- self.fu[self.lut[c]]=parseFunction(comp['source'])
-
- for t in comp['targets']:
- try:
- self.fM[self.lut[c]][self.lut[t]]=\
- sumFunctionArray(comp['targets'][t])
- except (KeyError,TypeError):
- self.fM[self.lut[c]]={}
- self.fM[self.lut[c]][self.lut[t]]=\
- sumFunctionArray(comp['targets'][t])
- #build SE part
- self.buildSE()
- def buildSE(self):
- #check which parameterst to include
- parList=[]
- for parName in self.seJ:
- #print(par)
- par=self.mod['parameters'][parName]
- usePar=calculateDerivative(par)
- #print('[{}]: {}'.format(usePar,par))
- if not usePar:
- continue
- parList.append(parName)
-
- #print(parList)
- self.m=len(parList)
- self.lutSE={c:i for (i,c) in zip(range(self.m),parList)}
-
- self.qSS={}
- for parName in parList:
- sources=self.seJ[parName]
- for compartment in sources:
- targets=sources[compartment]
- for t in targets:
- k=self.lutSE[parName]
- i=self.lut[compartment]
- j=self.lut[t]
- #print('[{} {} {}] {}'.format(parName,compartment,t,targets[t]))
- ft=sumFunctionArray(targets[t])
- try:
- self.qSS[k][i][j]=ft
- except (KeyError,TypeError):
- try:
- self.qSS[k][i]={}
- self.qSS[k][i][j]=ft
- except (KeyError,TypeError):
- self.qSS[k]={}
- self.qSS[k][i]={}
- self.qSS[k][i][j]=ft
-
- def inspect(self):
- comps=self.compartments
- pars=self.mod['parameters']
-
- tu=self.getTimeUnit()
- print('Time unit: {}'.format(tu))
- print('Compartments')
- for c in comps:
- print('{}/{}:'.format(c,self.lut[c]))
- comp=comps[c]
- if 'source' in comp:
- print('\tsource\n\t\t{}'.format(comp['source']))
- print('\ttargets')
- for t in comp['targets']:
- print('\t\t{}[{},{}]: {}'.format(t,self.lut[c],self.lut[t],\
- comp['targets'][t]))
- print('Flows')
- for f in self.flows:
- fName=self.flows[f]
- fParName=self.mod['flows'][fName]
- fPar=pars[fParName]
- print('\t{}[{}]:{} [{}]'.format(f,fName,fParName,self.get(fParName)))
- print('Volumes')
- for v in self.mod['volumes']:
- vParName=self.mod['volumes'][v]
- vPar=pars[vParName]
- print('\t{}:{} [{}]'.format(v,vParName,self.get(vParName)))
- print('Partition coefficients')
- for pc in self.mod['partitionCoefficients']:
- pcParName=self.mod['partitionCoefficients'][pc]
- pcPar=pars[pcParName]
- print('\t{}:{} [{}]'.format(pc,pcParName,self.get(pcParName)))
- print('SE parameters')
- for p in self.seJ:
- print(p)
- sources=self.seJ[p]
- for compartment in sources:
- targets=sources[compartment]
- for t in targets:
- print('\t SE bind {}/{}:{}'.format(compartment,t,targets[t]))
-
- def parse(self,file):
-
- with open(file,'r') as f:
- self.mod=json.load(f)
- for m in self.mod['compartments']:
- self.add_compartment(m)
- self.add_default_parameters()
- #standard parameters such as one,zero etc.
- for s in self.mod['sources']:
- #src=mod['sources'][s]
- self.add_source(s,self.mod['sources'][s])
- self.flows={}
- pars=self.mod['parameters']
- for f in self.mod['flows']:
- #skip comments
- if f.find(':')<0:
- continue
-
- comps=f.split(':')
- c0=splitVector(comps[0])
- c1=splitVector(comps[1])
- for x in c0:
- for y in c1:
- pairName='{}:{}'.format(x,y)
- self.flows[pairName]=f
-
- for b in self.mod['bindings']['diffusion']:
- #whether to scale transfer constants to organ volume
- #default is true, but changing here will assume no scaling
- useVolume=1
- comps=b.split('->')
- try:
- pcParName=self.mod['partitionCoefficients'][b]
- except KeyError:
- pcParName="one"
-
- kParName=self.mod['bindings']['diffusion'][b]
- #operate with names to allow for value/function/derived infrastructure
- self.bind(comps[0],comps[1],kParName,pcParName,useVolume)
-
- for q in self.mod['bindings']['flow']:
- comps=q.split('->')
- srcs=splitVector(comps[0])
- tgts=splitVector(comps[1])
- for cs in srcs:
- for ct in tgts:
- #get partition coefficient
- try:
- pcParName=self.mod['partitionCoefficients'][cs]
- except KeyError:
- pcParName="one"
-
- #get flow (direction could be reversed)
- try:
- qName=self.flows['{}:{}'.format(cs,ct)]
- except KeyError:
- qName=self.flows['{}:{}'.format(ct,cs)]
-
- flowParName=self.mod['flows'][qName]
- #flowPar=pars[flowParName]
-
- self.bind(cs,ct,flowParName,pcParName,1)
-
- self.build()
-
- def add_default_parameters(self):
- self.mod['parameters']['one']={'value':1}
- self.mod['parameters']['zero']={'value':0}
-
- def M(self,t):
- Mb=numpy.zeros((self.n,self.n))
- for i in self.fM:
- for j in self.fM[i]:
- Mb[i,j]=self.fM[i][j](t)
- #create an array and fill it with outputs of function at t
- return Mb
- def u(self,t):
- ub=[f(t) for f in self.fu]
- return numpy.array(ub)
- def fSS(self,t):
- fSS=numpy.zeros((self.m,self.n,self.n))
- for k in self.qSS:
- for i in self.qSS[k]:
- for j in self.qSS[k][i]:
- #print('[{},{},{}] {}'.format(k,i,j,self.qSS[k][i][j]))
- fSS[k,i,j]=(self.qSS[k][i][j])(t)
- return fSS
-
-
- def fSY(self,y,t):
- #M number of sensitivity parameters
- #N number of equations
- #fSS is MxNxN
- #assume a tabulated solution y(t) at t spaced intervals
- qS=self.fSS(t).dot(y)
- #qS is MxN
- #but NxM is expected, so do a transpose
- #for simultaneous calculation, a Nx(M+1) matrix is expected
- tS=numpy.zeros((self.n,self.m+1))
- #columns from 2..M+1 are the partial derivatives
- tS[:,1:]=numpy.transpose(qS)
- #first column is the original function
- tS[:,0]=self.u(t)
- return tS
-
- def fS(self,t):
- #M number of sensitivity parameters
- #N number of equations
- #fSS is MxNxN
-
- #assume a tabulated solution y(t) at t spaced intervals
-
- qS=self.fSS(t).dot(self.getY(t))
- return numpy.transpose(qS)
-
- def getSEJ(self,parName):
- #find the sensitivity (SE) derivative of Jacobi with
- #respect to parameter
- try:
- return self.seJ[parName]
- except KeyError:
- self.seJ[parName]={}
- return self.seJ[parName]
-
- def getSEJ_comp(self,parName,compartmentName):
- #find equation dictating concentration in compartmentName
- #for jacobi-parameter derivative
- seJ=self.getSEJ(parName)
- try:
- return seJ[compartmentName]
- except KeyError:
- seJ[compartmentName]={}
- return seJ[compartmentName]
- def setY(self,t,y):
- self.tck=[None]*self.n
- for i in range(self.n):
- self.tck[i] = scipy.interpolate.splrep(t, y[:,i], s=0)
-
- def getY(self,t):
- fY=numpy.zeros(self.n)
- for i in range(self.n):
- fY[i]=scipy.interpolate.splev(t, self.tck[i], der=0)
- return fY
-
- def calculateUncertainty(self,sol,se):
- s2out=numpy.zeros(sol.shape)
- pars=self.mod['parameters']
- for parName in pars:
- par=pars[parName]
- if not calculateDerivative(par):
- continue
- v=par["value"]
- if par['dist']=='lognormal':
- #this is sigma^2_lnx
- sln2=numpy.log(par["cv"]*par["cv"]+1)
- #have to multiplied by value to get the derivative
- #with respect to lnx
- s2=sln2*v*v
- else:
- #for Gaussian, cv is sigma/value; get sigma by value multiplication
- s2=par["cv"]*par["cv"]*v*v
- j=self.lutSE[parName]
- fse=se[:,:,j]
- print('Calculating for {}/{}:{}'.format(parName,j,fse.shape))
- #se2 is nt x n
- se2=numpy.multiply(fse,fse)
- s2out+=se2*s2
- return numpy.sqrt(s2out)
- def get(self,parName):
- par=self.mod['parameters'][parName]
- par['name']=parName
- if "value" in par:
- return self.getValue(par)
- if "function" in par:
- return self.getFunction(par)
- if "derived" in par:
- return self.getDerived(par)
- def getValue(self,par):
- v=par["value"]
- parName=par['name']
- #convert to seconds
- try:
- parUnits=par['unit'].split('/')
- except (KeyError,IndexError):
- #no unit given
- return valueObject(v,parName)
-
- timeUnit=self.getTimeUnit()
- try:
- if parUnits[1]==timeUnit:
- return valueObject(v,parName)
- except IndexError:
- #no / in unit name
- return valueObject(v,parName)
- if parUnits[1]=='min' and timeUnit=='s':
- return valueObject(lambda t,v=v: v(t)/60,parName)
-
- if parUnits[1]=='s' and timeUnit=='min':
- return valueObject(lambda t,v=v: 60*v(t),parName)
- #no idea what to do
- return valueObject(v,parName)
- def getFunction(self,par):
- fName=par['function']
- #print('[{}]: getFunction({})'.format(par['name'],par['function']))
- df=self.mod['functions'][fName]
- if df['type']=='linearGrowth':
- skip=['type']
- par1={x:self.get(df[x]) for x in df if x not in skip}
- #print(par1)
- return linearGrowth(par1)
- def getDerived(self,par):
- dName=par['derived']
- d=self.mod['derivedParameters'][dName]
- #print('Derived [{}]: type {}'.format(dName,d['type']))
- if d['type']=='product':
- #print('{}*{}'.format(d['a'],d['b']))
- pA=self.get(d['a'])
- a=pA['value']
- DA=pA['derivatives']
- pB=self.get(d['b'])
- b=pB['value']
- DB=pB['derivatives']
- #even more generic -> df/dp=[df/dA*dA/dp+df/dB*dB/dp]
- f=lambda t,a=a,b=b,:a(t)*b(t)
- dfdA=lambda t,b=b: b(t)
- dfdB=lambda t,a=a: a(t)
- dA={x:lambda t,df=dfdA,D=DA,x=x: df(t)*D[x](t) for x in DA}
- dB={x:lambda t,df=dfdB,D=DB,x=x: df(t)*D[x](t) for x in DB}
- return derivedObject(f,[dA,dB])
- if d['type']=='power':
- #print('{}^{}'.format(d['a'],d['n']))
- pA=self.get(d['a'])
- a=pA['value']
- DA=pA['derivatives']
- pN=self.get(d['n'])
- n=pN['value']
- DN=pN['derivatives']
- f=lambda t,a=a,n=n:numpy.power(a(t),n(t))
- dfdA=lambda t,n=n,f=f,a=a:n(t)*f(t)/a(t)
- dfdN=lambda t,a=a,f=f:numpy.log(a(t))*f(t)
- dA={x:lambda t,df=dfdA,D=DA,x=x: df(t)*D[x](t) for x in DA}
- dN={x:lambda t,df=dfdN,D=DN,x=x: df(t)*D[x](t) for x in DN}
- return derivedObject(f,[dA,dN])
- if d['type']=='ratio':
- #print('{}/{}'.format(d['a'],d['b']))
- pA=self.get(d['a'])
- a=pA['value']
- DA=pA['derivatives']
- pB=self.get(d['b'])
- b=pB['value']
- DB=pB['derivatives']
- #even more generic -> df/dp=[df/dA*dA/dp+df/dB*dB/dp]
- f=lambda t,a=a,b=b,:a(t)/b(t)
- dfdA=lambda t,f=f,a=a: f(t)/a(t)
- dfdB=lambda t,f=f,b=b: -f(t)/b(t)
- dA={x:lambda t,df=dfdA,D=DA,x=x: df(t)*D[x](t) for x in DA}
- dB={x:lambda t,df=dfdB,D=DB,x=x: df(t)*D[x](t) for x in DB}
- return derivedObject(f,[dA,dB])
- if d['type']=='sum':
- #print('{}+{}'.format(d['a'],d['b']))
- pA=self.get(d['a'])
- a=pA['value']
- DA=pA['derivatives']
- pB=self.get(d['b'])
- b=pB['value']
- DB=pB['derivatives']
- #even more generic -> df/dp=[df/dA*dA/dp+df/dB*dB/dp]
- f=lambda t,a=a,b=b,:a(t)+b(t)
- dfdA=lambda t: 1
- dfdB=lambda t: 1
- dA={x:lambda t,df=dfdA,D=DA,x=x: df(t)*D[x](t) for x in DA}
- dB={x:lambda t,df=dfdB,D=DB,x=x: df(t)*D[x](t) for x in DB}
- return derivedObject(f,[dA,dB])
- def calculateDerivative(par):
- #add derivatives if dist(short for distribution) is specified
- return "dist" in par
-
- def sumValues(dArray,x):
- s=0
- for a in dArray:
- try:
- s+=a[x]
- except KeyError:
- continue
- return s
-
- def sumFunctions(dArray,x):
- #print('sumFunctions for {}'.format(x))
- fs=[]
- for a in dArray:
- #print('\tChecking {}'.format(a))
- try:
- #check if a[x] is a function with a dummy parameter
- v=a[x](1e7)
- fs.append(a[x])
- #print('\tAdding [{}]'.format(v))
- except KeyError:
- #print('\t{} not in table'.format(x))
- continue
- except TypeError:
- #print('\tConstructing lambda for {}'.format(x))
- fs.append(lambda t,a=a,x=x:a[x])
- return sumFunctionArray(fs)
- def sumFunctionArray(fs):
- return lambda t,fs=fs: sum([f(t) for f in fs])
- def valueObject(v,parName):
- #convert everything to functions
- d0={parName:lambda t:1}
- try:
- v(3)
- return {'value':v,'derivatives':d0}
- except TypeError:
- pass
- return {'value':lambda t,v=v:v,'derivatives':d0}
- def derivedObject(v,dArray):
- try:
- v(3)
- o={'value':v}
- except TypeError:
- o={'value':lambda t,v=v:v}
- allKeys=[]
- for x in dArray:
- allKeys.extend(x.keys())
- allKeys=list(set(allKeys))
- o['derivatives']={x:sumFunctions(dArray,x) for x in allKeys}
- return o
-
- def splitVector(v):
- if v.find('(')<0:
- return [v]
- return v[1:-1].split(',')
- def parseFunction(formula):
- if formula['name']=='exponential':
- c0=formula['constant']
- k=formula['k']
- return lambda t,c=c0,k=k:c*numpy.exp(k*t)
- if formula['name']=='constant':
- c0=formula['value']
- return lambda t,c0=c0:c0
- if formula['name']=='Heavyside':
- t0=formula['limit']
- v=formula['value']
- return lambda t,v=v,t0=t0:v if t<t0 else 0
- return lambda t:1
- def addValue(qdict,compName,v):
- #add function to compName of dictionary qdict,
- #check if compName exists and handle the potential error
- #lambda functions can't be summed directly, so qdict is a list
- #that will be merged at matrix generation time
- try:
- qdict[compName].append(v)
- except KeyError:
- qdict[compName]=[v]
- #also add derivatives
- #
- # for d in dTarget:
- # ctarget=self.getSEJ_comp(d,target)
- # addValue(ctarget,target,dTarget[d])
- def get(timeUnit,par):
- v=par["value"]
- #convert to seconds
- try:
- parUnits=par['unit'].split('/')
- except (KeyError,IndexError):
- #no unit given
- return v
-
- try:
- if parUnits[1]==timeUnit:
- return v
- except IndexError:
- #no / in unit name
- return v
- if parUnits[1]=='min' and timeUnit=='s':
- return v/60
-
- if parUnits[1]=='s' and timeUnit=='min':
- return 60*v
- #no idea what to do
- return v
- def logistic(par):
- t0=par['t0']['value']
- W=par['W']['value']
- L=lambda t,t0=t0,W=W:1/(1+numpy.exp(-(t-t0(t))/W(t)))
- #D1[x] could be a function !!!
- Dt0=par['t0']['derivatives']
- dt0={x:lambda t,L=L,W=W,D=Dt0,x=x: L(t)*(1-L(t))/W(t)*D[x](t) for x in Dt0}
- #D2[x] could be a function !!!
- DW=par['W']['derivatives']
- dW={x:lambda t,L=L,t0=t0,W=W,D=DW,x=x: L(t)*(1-L(t))*(t-t0(t))/W(t)/W(t)*D[x](t) for x in DW}
- #merge
- return derivedObject(L,[dt0,dW])
- def linearGrowth(par):
- #make sure we never return zero or negative
- C=par['C']['value']
- t1=par['t1']['value']
- t2=par['t2']['value']
- eps=1e-8*C(0)
- pL1=logistic({'t0':par['t1'],'W':par['W1']})
- pL2=logistic({'t0':par['t2'],'W':par['W2']})
- L1=pL1['value']
- L2=pL2['value']
-
- deltaW=lambda t,t1=t1,t2=t2:t2(t)-t1(t)
- deltaT=lambda t,t1=t1:t-t1(t)
- fq=lambda t,dt=deltaT,w=deltaW:dt(t)/w(t)
- f1=lambda t,eps=eps,q=fq,L1=L1,L2=L2,C=C:C(t)*q(t)*L1(t)*(1-L2(t))
- f2=lambda t,L1=L1,L2=L2,C=C: C(t)*L1(t)*L2(t)
-
- f=lambda t,eps=eps,f1=f1,f2=f2:eps+f1(t)+f2(t)
- Dt1=par['t1']['derivatives']
- dfdt1=lambda t,q=fq,C=C,w=deltaW,L1=L1,L2=L2:\
- -C(t)*L1(t)*(1-L2(t))*(1+q(t))/w(t)
- dt1={x:lambda t,df=dfdt1,D=Dt1,x=x:df(t)*D[x](t) for x in Dt1}
-
- Dt2=par['t2']['derivatives']
- dfdt2=lambda t,f1=f1,w=deltaW:-f1(t)/w(t)
- dt2={x:lambda t,df=dfdt2,D=Dt2,x=x:df(t)*D[x](t) for x in Dt2}
- DL1=pL1['derivatives']
- dfdL1=lambda t,C=C,L2=L2,q=fq:C(t)*(q(t)*(1-L2(t))+L2(t))
- dL1={x:lambda t,df=dfdL1,D=DL1,x=x:df(t)*D[x](t) for x in DL1}
- DL2=pL2['derivatives']
- dfdL2=lambda t,C=C,L1=L1,q=fq:C(t)*L1(t)*(1-q(t))
- dL2={x:lambda t,df=dfdL2,D=DL2,x=x: df(t)*D[x](t) for x in DL2}
- DC=par['C']['derivatives']
- dfdC=lambda t,L1=L1,L2=L2,q=fq:L1(t)*(q(t)*(1-L2(t))+L2(t))
- dC={x:lambda t,df=dfdC,D=DC,x=x:df(t)*D[x](t) for x in DC}
- return derivedObject(f,[dt1,dt2,dL1,dL2,dC])
-
|