123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- import numpy
- import json
- import os
- import scipy.interpolate
- class model:
- def __init__(self):
- self.compartments={}
- self.seJ={}
-
- def add_source(self,compartmentName,formula):
- self.compartments[compartmentName]['source']=formula
- def add_compartment(self,compartmentName):
- self.compartments[compartmentName]={}
- self.compartments[compartmentName]['targets']={}
- self.compartments[compartmentName]['sensTargets']={}
- def getTimeUnit(self):
- try:
- return self.mod['timeUnit']
- except KeyError:
- return 's'
- def bind(self,sourceCompartment,targetCompartment,k,pc,scaleToVolume=0):
-
- #establish a flow from one compartment to the other
- cSrc=self.compartments[sourceCompartment]
- cTarg=self.compartments[targetCompartment]
- vSPar=self.getVolumePar(sourceCompartment,scaleToVolume)
- vTPar=self.getVolumePar(targetCompartment,scaleToVolume)
-
- tu=self.getTimeUnit()
- fk=get(tu,k)
- fpc=get(tu,pc)
- fvs=get(tu,vSPar)
- fvt=get(tu,vTPar)
- #the source equation (where we subtract the current)
- addValue(cSrc['targets'],sourceCompartment,-fk/fpc/fvs)
- #the target equation (where we add the current)
- addValue(cTarg['targets'],sourceCompartment,fk/fpc/fvt)
-
-
- def getDerivative(self,variable, sign, qPar, pcPar, vPar):
-
- #for flow based transfer, k=Q/vS/pc,
- #and jacobian derivative is -Q/V/pc/pc
- #for diffusion dominated transfer, k=k1/pc
-
- tu=self.getTimeUnit()
- q=get(tu,qPar)
- pc=get(tu,pcPar)
- v=get(tu,vPar)
- if variable=="partitionCoefficient":
- return sign*(-1)*q/v/pc/pc
- if variable=="diffusionCoefficient":
- return sign/pc/v
- if variable=="flow":
- return sign/pc/v
- if variable=="volume":
- return sign*(-1)*q/pc/v/v
-
- def getVolumePar(self,compartment,useVolume=1):
- if not useVolume:
- return {"value":1}
- try:
- parVName=self.mod["volumes"][compartment]
- parV=self.mod["parameters"][parVName]
- except KeyError:
- return {"value":1}
- parV["name"]=parVName
- return parV
- def bindDerivative(self,parameterName,variable,src,target,q,pc,useVolume=1):
- parName=parameterName
- srcV=self.getVolumePar(src,useVolume)
- doAdd=(variable!="volume" or calculateDerivative(srcV))
-
- if doAdd:
- #the source equation (where we subtract the current)
- if variable=="volume":
- parName=srcV["name"]
- csrc=self.getSEJ_comp(parName,src)
- addValue(csrc,src,self.getDerivative(variable,-1,q,pc,srcV))
- targetV=self.getVolumePar(target,useVolume)
- doAdd=(variable!="volume" or calculateDerivative(targetV))
- #the target equation (where we add the current)
- if doAdd:
- if variable=="volume":
- parName=targetV["name"]
- ctarget=self.getSEJ_comp(parName,target)
- addValue(ctarget,src,self.getDerivative(variable,+1,q,pc,targetV))
-
- def build(self):
- comps=self.compartments
- self.n=len(comps)
- self.M=numpy.zeros((self.n,self.n))
- self.fu=[lambda t:0]*self.n
- self.lut={c:i for (i,c) in zip(range(self.n),comps.keys())}
- for c in comps:
- comp=comps[c]
- if 'source' in comp:
- self.fu[self.lut[c]]=parseFunction(comp['source'])
-
- for t in comp['targets']:
- self.M[self.lut[c],self.lut[t]]=comp['targets'][t]
- #build SE part
- self.m=len(self.seJ)
- self.lutSE={c:i for (i,c) in zip(range(self.m),self.seJ.keys())}
- #MxNxN matrix
- self.fSS=numpy.zeros((self.m,self.n,self.n))
- for par in self.seJ:
- sources=self.seJ[par]
- for compartment in sources:
- targets=sources[compartment]
- for t in targets:
- k=self.lutSE[par]
- i=self.lut[compartment]
- j=self.lut[t]
- self.fSS[k,i,j]=targets[t]
-
- def inspect(self):
- comps=self.compartments
- pars=self.mod['parameters']
-
- tu=self.getTimeUnit()
- print('Time unit: {}'.format(tu))
- print('Compartments')
- for c in comps:
- print('{}/{}:'.format(c,self.lut[c]))
- comp=comps[c]
- if 'source' in comp:
- print('\tsource\n\t\t{}'.format(comp['source']))
- print('\ttargets')
- for t in comp['targets']:
- print('\t\t{}[{},{}]: {}'.format(t,self.lut[c],self.lut[t],\
- comp['targets'][t]))
- print('Flows')
- for f in self.flows:
- fName=self.flows[f]
- fParName=self.mod['flows'][fName]
- fPar=pars[fParName]
- print('\t{}[{}]:{} [{}]'.format(f,fName,fParName,get(tu,fPar)))
- print('Volumes')
- for v in self.mod['volumes']:
- vParName=self.mod['volumes'][v]
- vPar=pars[vParName]
- print('\t{}:{} [{}]'.format(v,vParName,get(tu,vPar)))
- print('Partition coefficients')
- for pc in self.mod['partitionCoefficients']:
- pcParName=self.mod['partitionCoefficients'][pc]
- pcPar=pars[pcParName]
- print('\t{}:{} [{}]'.format(pc,pcParName,get(tu,pcPar)))
- print('SE parameters')
- for p in self.seJ:
- print(p)
- sources=self.seJ[p]
- for compartment in sources:
- targets=sources[compartment]
- for t in targets:
- print('\t SE bind {}/{}:{}'.format(compartment,t,targets[t]))
-
- def parse(self,file):
-
- with open(file,'r') as f:
- self.mod=json.load(f)
- for m in self.mod['compartments']:
- self.add_compartment(m)
- for s in self.mod['sources']:
- #src=mod['sources'][s]
- self.add_source(s,self.mod['sources'][s])
- self.flows={}
- print('timeUnit: {}'.format(self.getTimeUnit()))
- pars=self.mod['parameters']
- for f in self.mod['flows']:
- #skip comments
- if f.find(':')<0:
- continue
-
- comps=f.split(':')
- c0=splitVector(comps[0])
- c1=splitVector(comps[1])
- for x in c0:
- for y in c1:
- pairName='{}:{}'.format(x,y)
- self.flows[pairName]=f
-
- for b in self.mod['bindings']['diffusion']:
- #whether to scale transfer constants to organ volume
- #default is true, but changing here will assume no scaling
- useVolume=1
- comps=b.split('->')
- try:
- pcParName=self.mod['partitionCoefficients'][b]
- pcPar=pars[pcParName]
- except KeyError:
- pcPar={"value":1}
-
- kParName=self.mod['bindings']['diffusion'][b]
- kPar=pars[kParName]
- self.bind(comps[0],comps[1],kPar,pcPar,useVolume)
-
- #add derivatives calculateDerivative returns true
- if calculateDerivative(kPar):
- self.bindDerivative(kParName,"diffusionParameter",\
- comps[0],comps[1],kPar,pcPar,useVolume)
- if calculateDerivative(pcPar):
- self.bindDerivative(pcParName,"partitionCoefficient",\
- comps[0],comps[1],kPar,pcPar,useVolume)
-
- for q in self.mod['bindings']['flow']:
- comps=q.split('->')
- srcs=splitVector(comps[0])
- tgts=splitVector(comps[1])
- for cs in srcs:
- for ct in tgts:
- #get partition coefficient
- try:
- pcParName=self.mod['partitionCoefficients'][cs]
- pcPar=pars[pcParName]
- except KeyError:
- pcPar={"value":1}
-
- #get flow (direction could be reversed)
- try:
- qName=self.flows['{}:{}'.format(cs,ct)]
- except KeyError:
- qName=self.flows['{}:{}'.format(ct,cs)]
-
- flowParName=self.mod['flows'][qName]
- flowPar=pars[flowParName]
-
- self.bind(cs,ct,flowPar,pcPar,1)
-
- if calculateDerivative(pcPar):
- self.bindDerivative(pcParName,"partitionCoefficient",\
- cs,ct,flowPar,pcPar)
- if calculateDerivative(flowPar):
- self.bindDerivative(flowParName,"flow",\
- cs,ct,flowPar,pcPar)
- self.bindDerivative("x","volume",cs,ct,flowPar,pcPar)
-
-
- self.build()
- def u(self,t):
- ub=[f(t) for f in self.fu]
- return numpy.array(ub)
-
- def fSY(self,y,t):
- #M number of sensitivity parameters
- #N number of equations
- #fSS is MxNxN
- #assume a tabulated solution y(t) at t spaced intervals
- qS=self.fSS.dot(y)
- #qS is MxN
- #but NxM is expected, so do a transpose
- #for simultaneous calculation, a Nx(M+1) matrix is expected
- tS=numpy.zeros((self.n,self.m+1))
- #columns from 2..M+1 are the partial derivatives
- tS[:,1:]=numpy.transpose(qS)
- #first column is the original function
- tS[:,0]=self.u(t)
- return tS
-
- def fS(self,t):
- #M number of sensitivity parameters
- #N number of equations
- #fSS is MxNxN
-
- #assume a tabulated solution y(t) at t spaced intervals
-
- qS=self.fSS.dot(self.getY(t))
- return numpy.transpose(qS)
-
- def getSEJ(self,parName):
- #find the sensitivity (SE) derivative of Jacobi with
- #respect to parameter
- try:
- return self.seJ[parName]
- except KeyError:
- self.seJ[parName]={}
- return self.seJ[parName]
-
- def getSEJ_comp(self,parName,compartmentName):
- #find equation dictating concentration in compartmentName
- #for jacobi-parameter derivative
- seJ=self.getSEJ(parName)
- try:
- return seJ[compartmentName]
- except KeyError:
- seJ[compartmentName]={}
- return seJ[compartmentName]
- def setY(self,t,y):
- self.tck=[None]*self.n
- for i in range(self.n):
- self.tck[i] = scipy.interpolate.splrep(t, y[:,i], s=0)
-
- def getY(self,t):
- fY=numpy.zeros(self.n)
- for i in range(self.n):
- fY[i]=scipy.interpolate.splev(t, self.tck[i], der=0)
- return fY
-
- def calculateUncertainty(self,sol,se):
- s2out=numpy.zeros(sol.shape)
- pars=self.mod['parameters']
- for parName in pars:
- par=pars[parName]
- if not calculateDerivative(par):
- continue
- v=par["value"]
- if par['dist']=='lognormal':
- #this is sigma^2_lnx
- sln2=numpy.log(par["cv"]*par["cv"]+1)
- #have to multiplied by value to get the derivative
- #with respect to lnx
- s2=sln2*v*v
- else:
- #for Gaussian, cv is sigma/value; get sigma by value multiplication
- s2=par["cv"]*par["cv"]*v*v
- j=self.lutSE[parName]
- fse=se[:,:,j]
- print('Calculating for {}/{}:{}'.format(parName,j,fse.shape))
- #se2 is nt x n
- se2=numpy.multiply(fse,fse)
- s2out+=se2*s2
- return numpy.sqrt(s2out)
-
- def splitVector(v):
- if v.find('(')<0:
- return [v]
- return v[1:-1].split(',')
- def parseFunction(formula):
- if formula['name']=='exponential':
- c0=formula['constant']
- k=formula['k']
- return lambda t:c0*numpy.exp(k*t)
- if formula['name']=='constant':
- c0=formula['value']
- return lambda t:c0
- if formula['name']=='Heavyside':
- t0=formula['limit']
- v=formula['value']
- return lambda t:v if t<t0 else 0
- return lambda t:1
- def addValue(qdict,compName,v):
- #add real number v to attribute compName of dictionary qdict, check if compName exists and handle the potential error
- try:
- qdict[compName]+=v
- except KeyError:
- qdict[compName]=v
- def get(timeUnit,par):
- v=par["value"]
- #convert to seconds
- try:
- parUnits=par['unit'].split('/')
- except (KeyError,IndexError):
- #no unit given
- return v
-
- try:
- if parUnits[1]==timeUnit:
- return v
- except IndexError:
- #no / in unit name
- return v
- if parUnits[1]=='min' and timeUnit=='s':
- return v/60
-
- if parUnits[1]=='s' and timeUnit=='min':
- return 60*v
- #no idea what to do
- return v
- def calculateDerivative(par):
- #add derivatives if dist(short for distribution) is specified
- return "dist" in par
|