|
@@ -2,7 +2,7 @@
|
|
|
"cells": [
|
|
|
{
|
|
|
"cell_type": "code",
|
|
|
- "execution_count": 81,
|
|
|
+ "execution_count": 112,
|
|
|
"metadata": {},
|
|
|
"outputs": [],
|
|
|
"source": [
|
|
@@ -12,6 +12,7 @@
|
|
|
"import os\n",
|
|
|
"import json\n",
|
|
|
"import scipy.interpolate\n",
|
|
|
+ "import cModel\n",
|
|
|
"\n",
|
|
|
"def dfdy(t,y,system):\n",
|
|
|
" dfdy=system.M.dot(y)+system.u(t)\n",
|
|
@@ -51,362 +52,7 @@
|
|
|
" for i in range(system.m+1):\n",
|
|
|
" fJ[i*system.n:(i+1)*system.n,i*system.n:(i+1)*system.n]=system.M\n",
|
|
|
" return fJ\n",
|
|
|
- "\n",
|
|
|
- "class model:\n",
|
|
|
- " def __init__(self):\n",
|
|
|
- " self.compartments={}\n",
|
|
|
- " self.seJ={}\n",
|
|
|
- " \n",
|
|
|
- " def add_source(self,compartmentName,formula):\n",
|
|
|
- " self.compartments[compartmentName]['source']=formula\n",
|
|
|
- "\n",
|
|
|
- " def add_compartment(self,compartmentName):\n",
|
|
|
- " self.compartments[compartmentName]={}\n",
|
|
|
- " self.compartments[compartmentName]['targets']={}\n",
|
|
|
- " self.compartments[compartmentName]['sensTargets']={}\n",
|
|
|
- "\n",
|
|
|
- " def bind(self,sourceCompartment,targetCompartment,k,pc,scaleToVolume=0):\n",
|
|
|
- " #establish a flow from one compartment to the other\n",
|
|
|
- " cSrc=self.compartments[sourceCompartment]\n",
|
|
|
- " cTarg=self.compartments[targetCompartment]\n",
|
|
|
- " \n",
|
|
|
- " vSPar=self.getVolumePar(sourceCompartment,scaleToVolume)\n",
|
|
|
- " vTPar=self.getVolumePar(targetCompartment,scaleToVolume)\n",
|
|
|
- " \n",
|
|
|
- " #the source equation (where we subtract the current)\n",
|
|
|
- " addValue(cSrc['targets'],sourceCompartment,-get(k)/get(pc)/get(vSPar))\n",
|
|
|
- " #the target equation (where we add the current)\n",
|
|
|
- " addValue(cTarg['targets'],sourceCompartment,get(k)/get(pc)/get(vTPar))\n",
|
|
|
- " \n",
|
|
|
- " \n",
|
|
|
- " def getDerivative(self,variable, sign, qPar, pcPar, vPar):\n",
|
|
|
- " \n",
|
|
|
- " #for flow based transfer, k=Q/vS/pc, and jacobian derivative is -Q/V/pc/pc\n",
|
|
|
- " #for diffusion dominated transfer, k=k1/pc\n",
|
|
|
- " q=get(qPar)\n",
|
|
|
- " pc=get(pcPar)\n",
|
|
|
- " v=get(vPar)\n",
|
|
|
- " if variable==\"partitionCoefficient\":\n",
|
|
|
- " return sign*(-1)*q/v/pc/pc\n",
|
|
|
- " if variable==\"diffusionCoefficient\":\n",
|
|
|
- " return sign/pc/v\n",
|
|
|
- " if variable==\"flow\":\n",
|
|
|
- " return sign/pc/v\n",
|
|
|
- " if variable==\"volume\":\n",
|
|
|
- " return sign*(-1)*q/pc/v/v\n",
|
|
|
- " \n",
|
|
|
- " def getVolumePar(self,compartment,useVolume=1):\n",
|
|
|
- " if not useVolume:\n",
|
|
|
- " return {\"value\":1}\n",
|
|
|
- " try:\n",
|
|
|
- " parVName=self.mod[\"volumes\"][compartment]\n",
|
|
|
- " parV=self.mod[\"parameters\"][parVName]\n",
|
|
|
- " except KeyError:\n",
|
|
|
- " return {\"value\":1}\n",
|
|
|
- " \n",
|
|
|
- " parV[\"name\"]=parVName\n",
|
|
|
- " return parV\n",
|
|
|
- " \n",
|
|
|
- " def bindDerivative(self,parameterName,variable,src,target,q,pc,useVolume=1):\n",
|
|
|
- " parName=parameterName\n",
|
|
|
- " srcV=self.getVolumePar(src,useVolume)\n",
|
|
|
- " doAdd=(variable!=\"volume\" or calculateDerivative(srcV))\n",
|
|
|
- " \n",
|
|
|
- " if doAdd:\n",
|
|
|
- " #the source equation (where we subtract the current)\n",
|
|
|
- " if variable==\"volume\":\n",
|
|
|
- " parName=srcV[\"name\"]\n",
|
|
|
- " \n",
|
|
|
- " csrc=self.getSEJ_comp(parName,src)\n",
|
|
|
- " addValue(csrc,src,self.getDerivative(variable,-1,q,pc,srcV))\n",
|
|
|
- " \n",
|
|
|
- " targetV=self.getVolumePar(target,useVolume)\n",
|
|
|
- " doAdd=(variable!=\"volume\" or calculateDerivative(targetV))\n",
|
|
|
- " \n",
|
|
|
- " #the target equation (where we add the current)\n",
|
|
|
- " if doAdd:\n",
|
|
|
- " if variable==\"volume\":\n",
|
|
|
- " parName=targetV[\"name\"]\n",
|
|
|
- "\n",
|
|
|
- " ctarget=self.getSEJ_comp(parName,target)\n",
|
|
|
- " addValue(ctarget,src,self.getDerivative(variable,+1,q,pc,targetV))\n",
|
|
|
- " \n",
|
|
|
- " \n",
|
|
|
- " \n",
|
|
|
- " def build(self):\n",
|
|
|
- " comps=self.compartments\n",
|
|
|
- " self.n=len(comps)\n",
|
|
|
- " self.M=numpy.zeros((self.n,self.n))\n",
|
|
|
- " self.fu=[lambda t:0]*self.n\n",
|
|
|
- " self.lut={c:i for (i,c) in zip(range(self.n),comps.keys())}\n",
|
|
|
- " for c in comps:\n",
|
|
|
- " comp=comps[c]\n",
|
|
|
- " if 'source' in comp:\n",
|
|
|
- " self.fu[self.lut[c]]=parseFunction(comp['source'])\n",
|
|
|
- " \n",
|
|
|
- " for t in comp['targets']:\n",
|
|
|
- " self.M[self.lut[c],self.lut[t]]=comp['targets'][t]\n",
|
|
|
- " #build SE part\n",
|
|
|
- " self.m=len(self.seJ)\n",
|
|
|
- " #MxNxN matrix\n",
|
|
|
- " self.lutSE={c:i for (i,c) in zip(range(self.m),self.seJ.keys())}\n",
|
|
|
- " self.fSS=numpy.zeros((self.m,self.n,self.n))\n",
|
|
|
- " for par in self.seJ:\n",
|
|
|
- " sources=self.seJ[par]\n",
|
|
|
- " for compartment in sources:\n",
|
|
|
- " targets=sources[compartment]\n",
|
|
|
- " for t in targets:\n",
|
|
|
- " #print('FSS: Adding {}/{},{}/{},{}/{}:{}'.\\\n",
|
|
|
- " # format(par,self.lutSE[par],compartment,self.lut[compartment],t,self.lut[t],targets[t]))\n",
|
|
|
- " self.fSS[self.lutSE[par],self.lut[compartment],self.lut[t]]=targets[t]\n",
|
|
|
- " \n",
|
|
|
- " \n",
|
|
|
- " \n",
|
|
|
- " #print('Done')\n",
|
|
|
- " \n",
|
|
|
- " def inspect(self):\n",
|
|
|
- " comps=self.compartments\n",
|
|
|
- " pars=self.mod['parameters']\n",
|
|
|
- " print('Compartments')\n",
|
|
|
- " for c in comps:\n",
|
|
|
- " print('{}/{}:'.format(c,self.lut[c]))\n",
|
|
|
- " comp=comps[c]\n",
|
|
|
- " if 'source' in comp:\n",
|
|
|
- " print('\\tsource\\n\\t\\t{}'.format(comp['source']))\n",
|
|
|
- " print('\\ttargets')\n",
|
|
|
- " for t in comp['targets']:\n",
|
|
|
- " print('\\t\\t{}[{},{}]: {}'.format(t,self.lut[c],self.lut[t],comp['targets'][t]))\n",
|
|
|
- " print('Flows')\n",
|
|
|
- " for f in self.flows:\n",
|
|
|
- " fName=self.flows[f]\n",
|
|
|
- " fParName=self.mod['flows'][fName]\n",
|
|
|
- " fPar=pars[fParName]\n",
|
|
|
- " print('\\t{}[{}]:{} [{}]'.format(f,fName,fParName,get(fPar)))\n",
|
|
|
- " \n",
|
|
|
- " print('Volumes')\n",
|
|
|
- " for v in self.mod['volumes']:\n",
|
|
|
- " vParName=self.mod['volumes'][v]\n",
|
|
|
- " vPar=pars[vParName]\n",
|
|
|
- " print('\\t{}:{} [{}]'.format(v,vParName,get(vPar)))\n",
|
|
|
- " \n",
|
|
|
- " print('Partition coefficients')\n",
|
|
|
- " for pc in self.mod['partitionCoefficients']:\n",
|
|
|
- " pcParName=self.mod['partitionCoefficients'][pc]\n",
|
|
|
- " pcPar=pars[pcParName]\n",
|
|
|
- " print('\\t{}:{} [{}]'.format(pc,pcParName,get(pcPar)))\n",
|
|
|
- " \n",
|
|
|
- " \n",
|
|
|
- " print('SE parameters')\n",
|
|
|
- " for p in self.seJ:\n",
|
|
|
- " print(p)\n",
|
|
|
- " sources=self.seJ[p]\n",
|
|
|
- " for compartment in sources:\n",
|
|
|
- " targets=sources[compartment]\n",
|
|
|
- " for t in targets:\n",
|
|
|
- " print('\\t SE bind {}/{}:{}'.format(compartment,t,targets[t]))\n",
|
|
|
- " #print('Done')\n",
|
|
|
- " \n",
|
|
|
- " def parse(self,file):\n",
|
|
|
- " \n",
|
|
|
- " with open(file,'r') as f:\n",
|
|
|
- " self.mod=json.load(f)\n",
|
|
|
- " for m in self.mod['compartments']:\n",
|
|
|
- " self.add_compartment(m)\n",
|
|
|
- " for s in self.mod['sources']:\n",
|
|
|
- " #src=mod['sources'][s]\n",
|
|
|
- " self.add_source(s,self.mod['sources'][s])\n",
|
|
|
- " self.flows={}\n",
|
|
|
- " pars=self.mod['parameters']\n",
|
|
|
- " for f in self.mod['flows']:\n",
|
|
|
- " #skip comments\n",
|
|
|
- " if f.find(':')<0:\n",
|
|
|
- " continue\n",
|
|
|
- " \n",
|
|
|
- " comps=f.split(':')\n",
|
|
|
- " c0=splitVector(comps[0])\n",
|
|
|
- " c1=splitVector(comps[1])\n",
|
|
|
- " for x in c0:\n",
|
|
|
- " for y in c1:\n",
|
|
|
- " pairName='{}:{}'.format(x,y)\n",
|
|
|
- " self.flows[pairName]=f\n",
|
|
|
- " \n",
|
|
|
- " for b in self.mod['bindings']['diffusion']:\n",
|
|
|
- " comps=b.split('->')\n",
|
|
|
- " try:\n",
|
|
|
- " pcParName=self.mod['partitionCoefficients'][b]\n",
|
|
|
- " pcPar=pars[pcParName]\n",
|
|
|
- " except KeyError:\n",
|
|
|
- " pcPar={\"value\":1}\n",
|
|
|
- " \n",
|
|
|
- " kParName=self.mod['bindings']['diffusion'][b]\n",
|
|
|
- " kPar=pars[kParName]\n",
|
|
|
- " self.bind(comps[0],comps[1],kPar,pcPar)\n",
|
|
|
- " \n",
|
|
|
- " #add derivatives calculateDerivative returns true\n",
|
|
|
- " if calculateDerivative(kPar):\n",
|
|
|
- " self.bindDerivative(kParName,\"diffusionParameter\",comps[0],comps[1],kPar,pcPar,0)\n",
|
|
|
- " if calculateDerivative(pcPar):\n",
|
|
|
- " self.bindDerivative(pcParName,\"partitionCoefficient\",comps[0],comps[1],kPar,pcPar,0)\n",
|
|
|
- " \n",
|
|
|
- " for q in self.mod['bindings']['flow']:\n",
|
|
|
- " comps=q.split('->')\n",
|
|
|
- " srcs=splitVector(comps[0])\n",
|
|
|
- " tgts=splitVector(comps[1])\n",
|
|
|
- " for cs in srcs:\n",
|
|
|
- " for ct in tgts:\n",
|
|
|
- " #get partition coefficient\n",
|
|
|
- " try:\n",
|
|
|
- " pcParName=self.mod['partitionCoefficients'][cs]\n",
|
|
|
- " pcPar=pars[pcParName]\n",
|
|
|
- " except KeyError:\n",
|
|
|
- " pcPar={\"value\":1}\n",
|
|
|
- " \n",
|
|
|
- " #get flow (direction could be reversed)\n",
|
|
|
- " try:\n",
|
|
|
- " qName=self.flows['{}:{}'.format(cs,ct)]\n",
|
|
|
- " except KeyError:\n",
|
|
|
- " qName=self.flows['{}:{}'.format(ct,cs)]\n",
|
|
|
- " \n",
|
|
|
- " flowParName=self.mod['flows'][qName]\n",
|
|
|
- " flowPar=pars[flowParName]\n",
|
|
|
- " \n",
|
|
|
- " self.bind(cs,ct,flowPar,pcPar,1)\n",
|
|
|
- " \n",
|
|
|
- " if calculateDerivative(pcPar):\n",
|
|
|
- " self.bindDerivative(pcParName,\"partitionCoefficient\",cs,ct,flowPar,pcPar)\n",
|
|
|
- " if calculateDerivative(flowPar):\n",
|
|
|
- " self.bindDerivative(flowParName,\"flow\",cs,ct,flowPar,pcPar)\n",
|
|
|
- " self.bindDerivative(\"x\",\"volume\",cs,ct,flowPar,pcPar)\n",
|
|
|
- " \n",
|
|
|
- " #print('bind: {}:(q={},vt={},pc={}):{}'.format(bind,q,vt,pc,q/vt/pc))\n",
|
|
|
- " \n",
|
|
|
- " self.build()\n",
|
|
|
- " \n",
|
|
|
- " def u(self,t):\n",
|
|
|
- " ub=[f(t) for f in self.fu]\n",
|
|
|
- " return numpy.array(ub)\n",
|
|
|
- " \n",
|
|
|
- " def fSY(self,y,t):\n",
|
|
|
- " #M number of sensitivity parameters\n",
|
|
|
- " #N number of equations\n",
|
|
|
- " #fSS is MxNxN\n",
|
|
|
- " \n",
|
|
|
- " #assume a tabulated solution y(t) at t spaced intervals\n",
|
|
|
- " \n",
|
|
|
- " qS=self.fSS.dot(y)\n",
|
|
|
- " #qS is MxN\n",
|
|
|
- " #but NxM is expected, so do a transpose\n",
|
|
|
- " \n",
|
|
|
- " #for simultaneous calculation, a Nx(M+1) matrix is expected\n",
|
|
|
- " tS=numpy.zeros((self.n,self.m+1))\n",
|
|
|
- " #columns from 2..M+1 are the partial derivatives \n",
|
|
|
- " tS[:,1:]=numpy.transpose(qS)\n",
|
|
|
- " #first column is the original function\n",
|
|
|
- " tS[:,0]=self.u(t)\n",
|
|
|
- " return tS\n",
|
|
|
- " \n",
|
|
|
- " def fS(self,t):\n",
|
|
|
- " #M number of sensitivity parameters\n",
|
|
|
- " #N number of equations\n",
|
|
|
- " #fSS is MxNxN\n",
|
|
|
- " \n",
|
|
|
- " #assume a tabulated solution y(t) at t spaced intervals\n",
|
|
|
- " \n",
|
|
|
- " qS=self.fSS.dot(self.getY(t))\n",
|
|
|
- " return numpy.transpose(qS)\n",
|
|
|
- " \n",
|
|
|
- " def getSEJ(self,parName):\n",
|
|
|
- " #find the sensitivity (SE) derivative of Jacobi with respect to parameter \n",
|
|
|
- " try:\n",
|
|
|
- " return self.seJ[parName]\n",
|
|
|
- " except KeyError:\n",
|
|
|
- " self.seJ[parName]={}\n",
|
|
|
- " return self.seJ[parName]\n",
|
|
|
- " \n",
|
|
|
- " def getSEJ_comp(self,parName,compartmentName):\n",
|
|
|
- " #find equation dictating concentration in compartmentName for jacobi-parameter derivative\n",
|
|
|
- " seJ=self.getSEJ(parName)\n",
|
|
|
- " \n",
|
|
|
- " try:\n",
|
|
|
- " return seJ[compartmentName]\n",
|
|
|
- " except KeyError:\n",
|
|
|
- " seJ[compartmentName]={}\n",
|
|
|
- " return seJ[compartmentName]\n",
|
|
|
- " def setY(self,t,y):\n",
|
|
|
- " self.tck=[None]*self.n\n",
|
|
|
- " for i in range(self.n):\n",
|
|
|
- " self.tck[i] = scipy.interpolate.splrep(t, y[:,i], s=0)\n",
|
|
|
- " \n",
|
|
|
- " def getY(self,t):\n",
|
|
|
- " fY=numpy.zeros(self.n)\n",
|
|
|
- " for i in range(self.n):\n",
|
|
|
- " fY[i]=scipy.interpolate.splev(t, self.tck[i], der=0)\n",
|
|
|
- " return fY\n",
|
|
|
- " \n",
|
|
|
- " def calculateUncertainty(self,sol,se):\n",
|
|
|
- " s2out=numpy.zeros(sol.shape)\n",
|
|
|
- " pars=self.mod['parameters']\n",
|
|
|
- " for parName in pars:\n",
|
|
|
- " par=pars[parName]\n",
|
|
|
- " if not calculateDerivative(par):\n",
|
|
|
- " continue\n",
|
|
|
- " v=par[\"value\"]\n",
|
|
|
- " if par['dist']=='lognormal':\n",
|
|
|
- " #this is sigma^2_lnx\n",
|
|
|
- " sln2=numpy.log(par[\"cv\"]*par[\"cv\"]+1)\n",
|
|
|
- " #have to multiplied by value to get the derivative with respect to lnx\n",
|
|
|
- " s2=sln2*v*v\n",
|
|
|
- " else:\n",
|
|
|
- " #for Gaussian, cv is sigma/value; get sigma by value multiplication\n",
|
|
|
- " s2=par[\"cv\"]*par[\"cv\"]*v*v\n",
|
|
|
- " j=self.lutSE[parName]\n",
|
|
|
- " fse=se[:,:,j]\n",
|
|
|
- " print('Calculating for {}/{}:{}'.format(parName,j,fse.shape))\n",
|
|
|
- " #se2 is nt x n\n",
|
|
|
- " se2=numpy.multiply(fse,fse)\n",
|
|
|
- " s2out+=se2*s2\n",
|
|
|
- " return numpy.sqrt(s2out)\n",
|
|
|
- " \n",
|
|
|
- "def splitVector(v):\n",
|
|
|
- " if v.find('(')<0:\n",
|
|
|
- " return [v]\n",
|
|
|
- " return v[1:-1].split(',')\n",
|
|
|
- "\n",
|
|
|
- "def parseFunction(formula):\n",
|
|
|
- " if formula['name']=='exponential':\n",
|
|
|
- " c0=formula['constant']\n",
|
|
|
- " k=formula['k']\n",
|
|
|
- " return lambda t:c0*numpy.exp(k*t)\n",
|
|
|
- " if formula['name']=='constant':\n",
|
|
|
- " c0=formula['value']\n",
|
|
|
- " return lambda t:c0\n",
|
|
|
- " if formula['name']=='Heavyside':\n",
|
|
|
- " t0=formula['limit']\n",
|
|
|
- " v=formula['value']\n",
|
|
|
- " return lambda t:v if t<t0 else 0\n",
|
|
|
- " return lambda t:1\n",
|
|
|
- "\n",
|
|
|
- "def addValue(qdict,compName,v):\n",
|
|
|
- " #add real number v to attribute compName of dictionary qdict, check if compName exists and handle the potential error\n",
|
|
|
- " try:\n",
|
|
|
- " qdict[compName]+=v\n",
|
|
|
- " except KeyError:\n",
|
|
|
- " qdict[compName]=v\n",
|
|
|
- "\n",
|
|
|
- "def get(par):\n",
|
|
|
- " v=par[\"value\"]\n",
|
|
|
- " #convert to seconds\n",
|
|
|
- " try:\n",
|
|
|
- " if par['unit'].split('/')[1]=='min':\n",
|
|
|
- " return v/60\n",
|
|
|
- " except (KeyError,IndexError):\n",
|
|
|
- " pass\n",
|
|
|
- " \n",
|
|
|
- " return v\n",
|
|
|
- "\n",
|
|
|
- "def calculateDerivative(par):\n",
|
|
|
- " #add derivatives if dist(short for distribution) is specified\n",
|
|
|
- " return \"dist\" in par"
|
|
|
+ "\n"
|
|
|
]
|
|
|
},
|
|
|
{
|