import numpy import json import os import scipy.interpolate class model: def __init__(self): self.compartments={} self.seJ={} def add_source(self,compartmentName,formula): self.compartments[compartmentName]['source']=formula def add_compartment(self,compartmentName): self.compartments[compartmentName]={} self.compartments[compartmentName]['targets']={} self.compartments[compartmentName]['sensTargets']={} def getTimeUnit(self): try: return self.mod['timeUnit'] except KeyError: return 's' def bind(self,sourceCompartment,targetCompartment,k,pc,scaleToVolume=0): #establish a flow from one compartment to the other cSrc=self.compartments[sourceCompartment] cTarg=self.compartments[targetCompartment] vSPar=self.getVolumePar(sourceCompartment,scaleToVolume) vTPar=self.getVolumePar(targetCompartment,scaleToVolume) tu=self.getTimeUnit() fk=get(tu,k) fpc=get(tu,pc) fvs=get(tu,vSPar) fvt=get(tu,vTPar) #the source equation (where we subtract the current) addValue(cSrc['targets'],sourceCompartment,-fk/fpc/fvs) #the target equation (where we add the current) addValue(cTarg['targets'],sourceCompartment,fk/fpc/fvt) def getDerivative(self,variable, sign, qPar, pcPar, vPar): #for flow based transfer, k=Q/vS/pc, #and jacobian derivative is -Q/V/pc/pc #for diffusion dominated transfer, k=k1/pc tu=self.getTimeUnit() q=get(tu,qPar) pc=get(tu,pcPar) v=get(tu,vPar) if variable=="partitionCoefficient": return sign*(-1)*q/v/pc/pc if variable=="diffusionCoefficient": return sign/pc/v if variable=="flow": return sign/pc/v if variable=="volume": return sign*(-1)*q/pc/v/v def getVolumePar(self,compartment,useVolume=1): if not useVolume: return {"value":1} try: parVName=self.mod["volumes"][compartment] parV=self.mod["parameters"][parVName] except KeyError: return {"value":1} parV["name"]=parVName return parV def bindDerivative(self,parameterName,variable,src,target,q,pc,useVolume=1): parName=parameterName srcV=self.getVolumePar(src,useVolume) doAdd=(variable!="volume" or calculateDerivative(srcV)) if doAdd: #the source equation (where we subtract the current) if variable=="volume": parName=srcV["name"] csrc=self.getSEJ_comp(parName,src) addValue(csrc,src,self.getDerivative(variable,-1,q,pc,srcV)) targetV=self.getVolumePar(target,useVolume) doAdd=(variable!="volume" or calculateDerivative(targetV)) #the target equation (where we add the current) if doAdd: if variable=="volume": parName=targetV["name"] ctarget=self.getSEJ_comp(parName,target) addValue(ctarget,src,self.getDerivative(variable,+1,q,pc,targetV)) def build(self): comps=self.compartments self.n=len(comps) self.M=numpy.zeros((self.n,self.n)) self.fu=[lambda t:0]*self.n self.lut={c:i for (i,c) in zip(range(self.n),comps.keys())} for c in comps: comp=comps[c] if 'source' in comp: self.fu[self.lut[c]]=parseFunction(comp['source']) for t in comp['targets']: self.M[self.lut[c],self.lut[t]]=comp['targets'][t] #build SE part self.m=len(self.seJ) self.lutSE={c:i for (i,c) in zip(range(self.m),self.seJ.keys())} #MxNxN matrix self.fSS=numpy.zeros((self.m,self.n,self.n)) for par in self.seJ: sources=self.seJ[par] for compartment in sources: targets=sources[compartment] for t in targets: k=self.lutSE[par] i=self.lut[compartment] j=self.lut[t] self.fSS[k,i,j]=targets[t] def inspect(self): comps=self.compartments pars=self.mod['parameters'] tu=self.getTimeUnit() print('Time unit: {}'.format(tu)) print('Compartments') for c in comps: print('{}/{}:'.format(c,self.lut[c])) comp=comps[c] if 'source' in comp: print('\tsource\n\t\t{}'.format(comp['source'])) print('\ttargets') for t in comp['targets']: print('\t\t{}[{},{}]: {}'.format(t,self.lut[c],self.lut[t],\ comp['targets'][t])) print('Flows') for f in self.flows: fName=self.flows[f] fParName=self.mod['flows'][fName] fPar=pars[fParName] print('\t{}[{}]:{} [{}]'.format(f,fName,fParName,get(tu,fPar))) print('Volumes') for v in self.mod['volumes']: vParName=self.mod['volumes'][v] vPar=pars[vParName] print('\t{}:{} [{}]'.format(v,vParName,get(tu,vPar))) print('Partition coefficients') for pc in self.mod['partitionCoefficients']: pcParName=self.mod['partitionCoefficients'][pc] pcPar=pars[pcParName] print('\t{}:{} [{}]'.format(pc,pcParName,get(tu,pcPar))) print('SE parameters') for p in self.seJ: print(p) sources=self.seJ[p] for compartment in sources: targets=sources[compartment] for t in targets: print('\t SE bind {}/{}:{}'.format(compartment,t,targets[t])) def parse(self,file): with open(file,'r') as f: self.mod=json.load(f) for m in self.mod['compartments']: self.add_compartment(m) for s in self.mod['sources']: #src=mod['sources'][s] self.add_source(s,self.mod['sources'][s]) self.flows={} print('timeUnit: {}'.format(self.getTimeUnit())) pars=self.mod['parameters'] for f in self.mod['flows']: #skip comments if f.find(':')<0: continue comps=f.split(':') c0=splitVector(comps[0]) c1=splitVector(comps[1]) for x in c0: for y in c1: pairName='{}:{}'.format(x,y) self.flows[pairName]=f for b in self.mod['bindings']['diffusion']: #whether to scale transfer constants to organ volume #default is true, but changing here will assume no scaling useVolume=1 comps=b.split('->') try: pcParName=self.mod['partitionCoefficients'][b] pcPar=pars[pcParName] except KeyError: pcPar={"value":1} kParName=self.mod['bindings']['diffusion'][b] kPar=pars[kParName] self.bind(comps[0],comps[1],kPar,pcPar,useVolume) #add derivatives calculateDerivative returns true if calculateDerivative(kPar): self.bindDerivative(kParName,"diffusionParameter",\ comps[0],comps[1],kPar,pcPar,useVolume) if calculateDerivative(pcPar): self.bindDerivative(pcParName,"partitionCoefficient",\ comps[0],comps[1],kPar,pcPar,useVolume) for q in self.mod['bindings']['flow']: comps=q.split('->') srcs=splitVector(comps[0]) tgts=splitVector(comps[1]) for cs in srcs: for ct in tgts: #get partition coefficient try: pcParName=self.mod['partitionCoefficients'][cs] pcPar=pars[pcParName] except KeyError: pcPar={"value":1} #get flow (direction could be reversed) try: qName=self.flows['{}:{}'.format(cs,ct)] except KeyError: qName=self.flows['{}:{}'.format(ct,cs)] flowParName=self.mod['flows'][qName] flowPar=pars[flowParName] self.bind(cs,ct,flowPar,pcPar,1) if calculateDerivative(pcPar): self.bindDerivative(pcParName,"partitionCoefficient",\ cs,ct,flowPar,pcPar) if calculateDerivative(flowPar): self.bindDerivative(flowParName,"flow",\ cs,ct,flowPar,pcPar) self.bindDerivative("x","volume",cs,ct,flowPar,pcPar) self.build() def u(self,t): ub=[f(t) for f in self.fu] return numpy.array(ub) def fSY(self,y,t): #M number of sensitivity parameters #N number of equations #fSS is MxNxN #assume a tabulated solution y(t) at t spaced intervals qS=self.fSS.dot(y) #qS is MxN #but NxM is expected, so do a transpose #for simultaneous calculation, a Nx(M+1) matrix is expected tS=numpy.zeros((self.n,self.m+1)) #columns from 2..M+1 are the partial derivatives tS[:,1:]=numpy.transpose(qS) #first column is the original function tS[:,0]=self.u(t) return tS def fS(self,t): #M number of sensitivity parameters #N number of equations #fSS is MxNxN #assume a tabulated solution y(t) at t spaced intervals qS=self.fSS.dot(self.getY(t)) return numpy.transpose(qS) def getSEJ(self,parName): #find the sensitivity (SE) derivative of Jacobi with #respect to parameter try: return self.seJ[parName] except KeyError: self.seJ[parName]={} return self.seJ[parName] def getSEJ_comp(self,parName,compartmentName): #find equation dictating concentration in compartmentName #for jacobi-parameter derivative seJ=self.getSEJ(parName) try: return seJ[compartmentName] except KeyError: seJ[compartmentName]={} return seJ[compartmentName] def setY(self,t,y): self.tck=[None]*self.n for i in range(self.n): self.tck[i] = scipy.interpolate.splrep(t, y[:,i], s=0) def getY(self,t): fY=numpy.zeros(self.n) for i in range(self.n): fY[i]=scipy.interpolate.splev(t, self.tck[i], der=0) return fY def calculateUncertainty(self,sol,se): s2out=numpy.zeros(sol.shape) pars=self.mod['parameters'] for parName in pars: par=pars[parName] if not calculateDerivative(par): continue v=par["value"] if par['dist']=='lognormal': #this is sigma^2_lnx sln2=numpy.log(par["cv"]*par["cv"]+1) #have to multiplied by value to get the derivative #with respect to lnx s2=sln2*v*v else: #for Gaussian, cv is sigma/value; get sigma by value multiplication s2=par["cv"]*par["cv"]*v*v j=self.lutSE[parName] fse=se[:,:,j] print('Calculating for {}/{}:{}'.format(parName,j,fse.shape)) #se2 is nt x n se2=numpy.multiply(fse,fse) s2out+=se2*s2 return numpy.sqrt(s2out) def splitVector(v): if v.find('(')<0: return [v] return v[1:-1].split(',') def parseFunction(formula): if formula['name']=='exponential': c0=formula['constant'] k=formula['k'] return lambda t:c0*numpy.exp(k*t) if formula['name']=='constant': c0=formula['value'] return lambda t:c0 if formula['name']=='Heavyside': t0=formula['limit'] v=formula['value'] return lambda t:v if t