{ "cells": [ { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [], "source": [ "import scipy.integrate\n", "import numpy\n", "import matplotlib.pyplot\n", "import os\n", "import json\n", "import scipy.interpolate\n", "\n", "def dfdy(y,t,system):\n", " dfdy=system.M.dot(y)+system.u(t)\n", " return dfdy\n", "\n", "def jacobi(y,t,system):\n", " return system.M\n", "\n", "#SE post calculation\n", "def dfdyS(S,t,system):\n", " #unwrap S to NxM where M is number of parameters\n", " mS=numpy.reshape(S,(system.n,system.m))\n", " mOut=system.M.dot(mS)+system.fS(t)\n", " return numpy.ravel(mOut)\n", "\n", "def jacobiSE(S,t,system):\n", " N=system.n*(system.m)\n", " fJ=numpy.zeros((N,N))\n", " #print('fJ shape {}'.format(fJ.shape))\n", " for i in range(system.m):\n", " fJ[i*system.n:(i+1)*system.n,i*system.n:(i+1)*system.n]=system.M\n", " return fJ\n", "\n", "#SE simultaeneous calculation\n", "def dfdySFull(S,t,system):\n", " #unwrap S to NxM where M is number of parameters\n", " mS=numpy.reshape(S,(system.n,system.m+1))\n", " #system.fS(y,t) is NxM matrix where M are parameters\n", " y=mS[:,0]\n", " mOut=system.M.dot(mS)+system.fSY(y,t)\n", " return numpy.ravel(mOut)\n", "\n", "def jacobiSEFull(S,t,system):\n", " N=system.n*(system.m+1)\n", " fJ=numpy.zeros((N,N))\n", " #print('fJ shape {}'.format(fJ.shape))\n", " for i in range(system.m+1):\n", " fJ[i*system.n:(i+1)*system.n,i*system.n:(i+1)*system.n]=system.M\n", " return fJ\n", "\n", "class model:\n", " def __init__(self):\n", " self.compartments={}\n", " self.seJ={}\n", " \n", " def add_source(self,compartmentName,formula):\n", " self.compartments[compartmentName]['source']=formula\n", "\n", " def add_compartment(self,compartmentName):\n", " self.compartments[compartmentName]={}\n", " self.compartments[compartmentName]['targets']={}\n", " self.compartments[compartmentName]['sensTargets']={}\n", "\n", " def bind(self,sourceCompartment,targetCompartment,k,pc,scaleToVolume=0):\n", " #establish a flow from one compartment to the other\n", " cSrc=self.compartments[sourceCompartment]\n", " cTarg=self.compartments[targetCompartment]\n", " \n", " vSPar=self.getVolumePar(sourceCompartment)\n", " vTPar=self.getVolumePar(targetCompartment)\n", " \n", " #the source equation (where we subtract the current)\n", " addValue(cSrc['targets'],sourceCompartment,-get(k)/get(pc)/get(vSPar))\n", " #the target equation (where we add the current)\n", " addValue(cTarg['targets'],sourceCompartment,get(k)/get(pc)/get(vTPar))\n", " \n", " \n", " def getDerivative(self,variable, sign, qPar, pcPar, vPar):\n", " \n", " #for flow based transfer, k=Q/vS/pc, and jacobian derivative is -Q/V/pc/pc\n", " #for diffusion dominated transfer, k=k1/pc\n", " q=get(qPar)\n", " pc=get(pcPar)\n", " v=get(vPar)\n", " if variable==\"partitionCoefficient\":\n", " return sign*(-1)*q/v/pc/pc\n", " if variable==\"diffusionCoefficient\":\n", " return sign/pc/v\n", " if variable==\"flow\":\n", " return sign/pc/v\n", " if variable==\"volume\":\n", " return sign*(-1)*q/pc/v/v\n", " \n", " def getVolumePar(self,compartment,useVolume=1):\n", " if not useVolume:\n", " return {\"value\":1}\n", " try:\n", " parVName=self.mod[\"volumes\"][compartment]\n", " parV=self.mod[\"parameters\"][parVName]\n", " except KeyError:\n", " return {\"value\":1}\n", " \n", " parV[\"name\"]=parVName\n", " return parV\n", " \n", " def bindDerivative(self,parameterName,variable,src,target,q,pc,useVolume=1):\n", " parName=parameterName\n", " srcV=self.getVolumePar(src,useVolume)\n", " doAdd=(variable!=\"volume\" or \"range\" in srcV)\n", " \n", " if doAdd:\n", " #the source equation (where we subtract the current)\n", " if variable==\"volume\":\n", " parName=srcV[\"name\"]\n", " \n", " csrc=self.getSEJ_comp(parName,src)\n", " addValue(csrc,src,self.getDerivative(variable,-1,q,pc,srcV))\n", " \n", " targetV=self.getVolumePar(target,useVolume)\n", " doAdd=(variable!=\"volume\" or \"range\" in targetV)\n", " \n", " #the target equation (where we add the current)\n", " if doAdd:\n", " if variable==\"volume\":\n", " parName=targetV[\"name\"]\n", "\n", " ctarget=self.getSEJ_comp(parName,target)\n", " addValue(ctarget,src,self.getDerivative(variable,+1,q,pc,targetV))\n", " \n", " \n", " \n", " def build(self):\n", " comps=self.compartments\n", " self.n=len(comps)\n", " self.M=numpy.zeros((self.n,self.n))\n", " self.fu=[lambda t:0]*self.n\n", " self.lut={c:i for (i,c) in zip(range(self.n),comps.keys())}\n", " for c in comps:\n", " comp=comps[c]\n", " if 'source' in comp:\n", " self.fu[self.lut[c]]=parseFunction(comp['source'])\n", " \n", " for t in comp['targets']:\n", " self.M[self.lut[c],self.lut[t]]=comp['targets'][t]\n", " #build SE part\n", " self.m=len(self.seJ)\n", " #MxNxN matrix\n", " self.lutSE={c:i for (i,c) in zip(range(self.m),self.seJ.keys())}\n", " self.fSS=numpy.zeros((self.m,self.n,self.n))\n", " for par in self.seJ:\n", " sources=self.seJ[par]\n", " for compartment in sources:\n", " targets=sources[compartment]\n", " for t in targets:\n", " #print('FSS: Adding {}/{},{}/{},{}/{}:{}'.\\\n", " # format(par,self.lutSE[par],compartment,self.lut[compartment],t,self.lut[t],targets[t]))\n", " self.fSS[self.lutSE[par],self.lut[compartment],self.lut[t]]=targets[t]\n", " \n", " \n", " \n", " #print('Done')\n", " \n", " def inspect(self):\n", " comps=self.compartments\n", " pars=self.mod['parameters']\n", " print('Compartments')\n", " for c in comps:\n", " print('{}/{}:'.format(c,self.lut[c]))\n", " comp=comps[c]\n", " if 'source' in comp:\n", " print('\\tsource\\n\\t\\t{}'.format(comp['source']))\n", " print('\\ttargets')\n", " for t in comp['targets']:\n", " print('\\t\\t{}[{},{}]: {}'.format(t,self.lut[c],self.lut[t],comp['targets'][t]))\n", " print('Flows')\n", " for f in self.flows:\n", " fName=self.flows[f]\n", " fParName=self.mod['flows'][fName]\n", " fPar=pars[fParName]\n", " print('\\t{}[{}]:{} [{}]'.format(f,fName,fParName,get(fPar)))\n", " \n", " print('Volumes')\n", " for v in self.mod['volumes']:\n", " vParName=self.mod['volumes'][v]\n", " vPar=pars[vParName]\n", " print('\\t{}:{} [{}]'.format(v,vParName,get(vPar)))\n", " \n", " print('Partition coefficients')\n", " for pc in self.mod['partitionCoefficients']:\n", " pcParName=self.mod['partitionCoefficients'][pc]\n", " pcPar=pars[pcParName]\n", " print('\\t{}:{} [{}]'.format(pc,pcParName,get(pcPar)))\n", " \n", " \n", " print('SE parameters')\n", " for p in self.seJ:\n", " print(p)\n", " sources=self.seJ[p]\n", " for compartment in sources:\n", " targets=sources[compartment]\n", " for t in targets:\n", " print('\\t SE bind {}/{}:{}'.format(compartment,t,targets[t]))\n", " #print('Done')\n", " \n", " def parse(self,file):\n", " \n", " with open(file,'r') as f:\n", " self.mod=json.load(f)\n", " for m in self.mod['compartments']:\n", " self.add_compartment(m)\n", " for s in self.mod['sources']:\n", " #src=mod['sources'][s]\n", " self.add_source(s,self.mod['sources'][s])\n", " self.flows={}\n", " pars=self.mod['parameters']\n", " for f in self.mod['flows']:\n", " #skip comments\n", " if f.find(':')<0:\n", " continue\n", " \n", " comps=f.split(':')\n", " c0=splitVector(comps[0])\n", " c1=splitVector(comps[1])\n", " for x in c0:\n", " for y in c1:\n", " pairName='{}:{}'.format(x,y)\n", " self.flows[pairName]=f\n", " \n", " for b in self.mod['bindings']['diffusion']:\n", " comps=b.split('->')\n", " try:\n", " pcParName=self.mod['partitionCoefficients'][b]\n", " pcPar=pars[pcParName]\n", " except KeyError:\n", " pcPar={\"value\":1}\n", " \n", " kParName=self.mod['bindings']['diffusion'][b]\n", " kPar=pars[kParName]\n", " self.bind(comps[0],comps[1],kPar,pcPar)\n", " \n", " #add derivatives if range is specified\n", " if \"range\" in kPar:\n", " self.bindDerivative(kParName,\"diffusionParameter\",comps[0],comps[1],kPar,pcPar,0)\n", " if \"range\" in pcPar:\n", " self.bindDerivative(pcParName,\"partitionCoefficient\",comps[0],comps[1],kPar,pcPar,0)\n", " \n", " for q in self.mod['bindings']['flow']:\n", " comps=q.split('->')\n", " srcs=splitVector(comps[0])\n", " tgts=splitVector(comps[1])\n", " for cs in srcs:\n", " for ct in tgts:\n", " #get partition coefficient\n", " try:\n", " pcParName=self.mod['partitionCoefficients'][cs]\n", " pcPar=pars[pcParName]\n", " except KeyError:\n", " pcPar={\"value\":1}\n", " \n", " #get flow (direction could be reversed)\n", " try:\n", " qName=self.flows['{}:{}'.format(cs,ct)]\n", " except KeyError:\n", " qName=self.flows['{}:{}'.format(ct,cs)]\n", " \n", " flowParName=self.mod['flows'][qName]\n", " flowPar=pars[flowParName]\n", " \n", " self.bind(cs,ct,flowPar,pcPar,1)\n", " \n", " if \"range\" in pcPar:\n", " self.bindDerivative(pcParName,\"partitionCoefficient\",cs,ct,flowPar,pcPar)\n", " if \"range\" in flowPar:\n", " self.bindDerivative(flowParName,\"flow\",cs,ct,flowPar,pcPar)\n", " self.bindDerivative(\"x\",\"volume\",cs,ct,flowPar,pcPar)\n", " \n", " #print('bind: {}:(q={},vt={},pc={}):{}'.format(bind,q,vt,pc,q/vt/pc))\n", " \n", " self.build()\n", " \n", " def u(self,t):\n", " ub=[f(t) for f in self.fu]\n", " return numpy.array(ub)\n", " \n", " def fSY(self,y,t):\n", " #M number of sensitivity parameters\n", " #N number of equations\n", " #fSS is MxNxN\n", " \n", " #assume a tabulated solution y(t) at t spaced intervals\n", " \n", " qS=self.fSS.dot(y)\n", " #qS is MxN\n", " #but NxM is expected, so do a transpose\n", " \n", " #for simultaneous calculation, a Nx(M+1) matrix is expected\n", " tS=numpy.zeros((self.n,self.m+1))\n", " #columns from 2..M+1 are the partial derivatives \n", " tS[:,1:]=numpy.transpose(qS)\n", " #first column is the original function\n", " tS[:,0]=self.u(t)\n", " return tS\n", " \n", " def fS(self,t):\n", " #M number of sensitivity parameters\n", " #N number of equations\n", " #fSS is MxNxN\n", " \n", " #assume a tabulated solution y(t) at t spaced intervals\n", " \n", " qS=self.fSS.dot(self.getY(t))\n", " return numpy.transpose(qS)\n", " \n", " def getSEJ(self,parName):\n", " #find the sensitivity (SE) derivative of Jacobi with respect to parameter \n", " try:\n", " return self.seJ[parName]\n", " except KeyError:\n", " self.seJ[parName]={}\n", " return self.seJ[parName]\n", " \n", " def getSEJ_comp(self,parName,compartmentName):\n", " #find equation dictating concentration in compartmentName for jacobi-parameter derivative\n", " seJ=self.getSEJ(parName)\n", " \n", " try:\n", " return seJ[compartmentName]\n", " except KeyError:\n", " seJ[compartmentName]={}\n", " return seJ[compartmentName]\n", " def setY(self,t,y):\n", " self.tck=[None]*self.n\n", " for i in range(self.n):\n", " self.tck[i] = scipy.interpolate.splrep(t, y[:,i], s=0)\n", " \n", " def getY(self,t):\n", " fY=numpy.zeros(self.n)\n", " for i in range(self.n):\n", " fY[i]=scipy.interpolate.splev(t, self.tck[i], der=0)\n", " return fY\n", " \n", "def splitVector(v):\n", " if v.find('(')<0:\n", " return [v]\n", " return v[1:-1].split(',')\n", "\n", "def parseFunction(formula):\n", " if formula['name']=='exponential':\n", " c0=formula['constant']\n", " k=formula['k']\n", " return lambda t:c0*numpy.exp(k*t)\n", " if formula['name']=='constant':\n", " c0=formula['value']\n", " return lambda t:c0\n", " if formula['name']=='Heavyside':\n", " t0=formula['limit']\n", " v=formula['value']\n", " return lambda t:v if t" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "#copy output of Thompson et al\n", "print(s1.shape)\n", "fig, axs = matplotlib.pyplot.subplots(5, 3,figsize=(15,25))\n", "name=['arterial','adipose','brain','heart','kidney','liver','lung','muscle','skin',\n", " 'splanchnic','stomach','excrement']\n", "#diazepam\n", "max=[1.5,2.6,3,4,5,2.5,6.8,1.5,1.5,4,4.2,3]\n", "#cotinine\n", "max=[9]*12\n", "\n", "max=[1000*x for x in max]\n", "for i in range(len(name)):\n", " row=i//3\n", " col=i%3\n", " axs[row,col].plot(t/60,sol[:,sys.lut[name[i]]])\n", " axs[row,col].plot(t/60,s1[:,sys.lut[name[i]],0])\n", " axs[row,col].set_title(name[i])\n", " axs[row,col].set_ylim([0,max[i]])\n", " axs[row,col].set_xlim([0,250])\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 422/(9.1/0.15)/60" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0.0004898989898989898" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "422/(9.1/0.15)/60/11\n", "0.13/(0.39/0.97)/60/11\n" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[3. 2.]\n", " [2. 4.]\n", " [2. 6.]]\n", "[[3. 2.]\n", " [2. 4.]\n", " [2. 6.]]\n" ] } ], "source": [ "M=numpy.ones((3,2,2))\n", "M[0,0,1]=2\n", "M[1,1,0]=3\n", "M[2,1,1]=5\n", "v=numpy.ones(2)\n", "q=M.dot(v)\n", "q1=q.ravel()\n", "q2=numpy.reshape(q1,q.shape)\n", "print(q)\n", "print(q2)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 4 }