{ "cells": [ { "cell_type": "code", "execution_count": 161, "metadata": {}, "outputs": [], "source": [ "import scipy.integrate\n", "import numpy\n", "import matplotlib.pyplot\n", "import os\n", "import json\n", "import scipy.interpolate\n", "\n", "def dfdy(y,t,system):\n", " dfdy=system.M.dot(y)+system.u(t)\n", " return dfdy\n", "\n", "def jacobi(y,t,system):\n", " return system.M\n", "\n", "#SE post calculation\n", "def dfdyS(S,t,system):\n", " #unwrap S to NxM where M is number of parameters\n", " mS=numpy.reshape(S,(system.n,system.m))\n", " mOut=system.M.dot(mS)+system.fS(t)\n", " return numpy.ravel(mOut)\n", "\n", "def jacobiSE(S,t,system):\n", " N=system.n*(system.m)\n", " fJ=numpy.zeros((N,N))\n", " #print('fJ shape {}'.format(fJ.shape))\n", " for i in range(system.m):\n", " fJ[i*system.n:(i+1)*system.n,i*system.n:(i+1)*system.n]=system.M\n", " return fJ\n", "\n", "#SE simultaeneous calculation\n", "def dfdySFull(S,t,system):\n", " #unwrap S to NxM where M is number of parameters\n", " mS=numpy.reshape(S,(system.n,system.m+1))\n", " #system.fS(y,t) is NxM matrix where M are parameters\n", " y=mS[:,0]\n", " mOut=system.M.dot(mS)+system.fSY(y,t)\n", " return numpy.ravel(mOut)\n", "\n", "def jacobiSEFull(S,t,system):\n", " N=system.n*(system.m+1)\n", " fJ=numpy.zeros((N,N))\n", " #print('fJ shape {}'.format(fJ.shape))\n", " for i in range(system.m+1):\n", " fJ[i*system.n:(i+1)*system.n,i*system.n:(i+1)*system.n]=system.M\n", " return fJ\n", "\n", "class model:\n", " def __init__(self):\n", " self.compartments={}\n", " self.seJ={}\n", " \n", " def add_source(self,compartmentName,formula):\n", " self.compartments[compartmentName]['source']=formula\n", "\n", " def add_compartment(self,compartmentName):\n", " self.compartments[compartmentName]={}\n", " self.compartments[compartmentName]['targets']={}\n", " self.compartments[compartmentName]['sensTargets']={}\n", "\n", " def bind(self,sourceCompartment,targetCompartment,k,scaleToVolume=0):\n", " #establish a flow from one compartment to the other\n", " cSrc=self.compartments[sourceCompartment]\n", " cTarg=self.compartments[targetCompartment]\n", " vS=1\n", " vT=1\n", " if scaleToVolume:\n", " vS=self.mod[\"volumes\"][sourceCompartment]\n", " vT=self.mod[\"volumes\"][targetCompartment]\n", " \n", " #the source equation (where we subtract the current)\n", " addValue(cSrc['targets'],sourceCompartment,-k/vS)\n", " #the target equation (where we add the current)\n", " addValue(cTarg['targets'],sourceCompartment,k/vT)\n", " \n", " def getDerivative(self,variable, source, target, q, pc, useVolume):\n", " v=1\n", " if useVolume:\n", " v=self.mod[\"volumes\"][target]\n", " #the target equation (where we add the current)\n", " sign=1\n", " #the source equation (where we subtract the current)\n", " if source==target:\n", " sign=-1\n", " \n", " #for flow based transfer, k=Q/vS/pc, and jacobian derivative is -Q/V/pc/pc\n", " #for diffusion dominated transfer, k=k1/pc\n", " if variable==\"partitionCoefficient\":\n", " return sign*(-1)*q/v/pc/pc\n", " if variable==\"diffusionCoefficient\":\n", " return sign/pc/v\n", " if variable==\"flow\":\n", " return sign/pc/v\n", " if variable==\"volume\":\n", " return sign*(-1)*q/pc/v/v\n", " \n", " def bindDerivative(self,parameterName,variable,src,target,q,pc,useVolume=1):\n", " parName=parameterName\n", " if variable==\"volume\":\n", " parName='{}_V'.format(src)\n", " #the source equation (where we subtract the current)\n", " csrc=self.getSEJ_comp(parName,src)\n", " addValue(csrc,src,self.getDerivative(variable,src,src,q,pc,useVolume))\n", " \n", " if variable==\"volume\":\n", " parName='{}_V'.format(target)\n", " #the target equation (where we add the current)\n", " ctarget=self.getSEJ_comp(parName,target)\n", " addValue(ctarget,src,self.getDerivative(variable,src,target,q,pc,useVolume))\n", " \n", " \n", " \n", " def build(self):\n", " comps=self.compartments\n", " self.n=len(comps)\n", " self.M=numpy.zeros((self.n,self.n))\n", " self.fu=[lambda t:0]*self.n\n", " self.lut={c:i for (i,c) in zip(range(self.n),comps.keys())}\n", " for c in comps:\n", " comp=comps[c]\n", " if 'source' in comp:\n", " self.fu[self.lut[c]]=parseFunction(comp['source'])\n", " \n", " for t in comp['targets']:\n", " self.M[self.lut[c],self.lut[t]]=comp['targets'][t]\n", " #build SE part\n", " self.m=len(self.seJ)\n", " #MxNxN matrix\n", " self.lutSE={c:i for (i,c) in zip(range(self.m),self.seJ.keys())}\n", " self.fSS=numpy.zeros((self.m,self.n,self.n))\n", " for par in self.seJ:\n", " sources=self.seJ[par]\n", " for compartment in sources:\n", " targets=sources[compartment]\n", " for t in targets:\n", " #print('FSS: Adding {}/{},{}/{},{}/{}:{}'.\\\n", " # format(par,self.lutSE[par],compartment,self.lut[compartment],t,self.lut[t],targets[t]))\n", " self.fSS[self.lutSE[par],self.lut[compartment],self.lut[t]]=targets[t]\n", " \n", " \n", " \n", " #print('Done')\n", " \n", " def inspect(self):\n", " comps=self.compartments\n", " print('Compartments')\n", " for c in comps:\n", " print('{}/{}:'.format(c,self.lut[c]))\n", " comp=comps[c]\n", " if 'source' in comp:\n", " print('\\tsource\\n\\t\\t{}'.format(comp['source']))\n", " print('\\ttargets')\n", " for t in comp['targets']:\n", " print('\\t\\t{}[{},{}]: {}'.format(t,self.lut[c],self.lut[t],comp['targets'][t]))\n", " print('Flows')\n", " for f in self.flows:\n", " print('\\t{}:{}'.format(f,self.flows[f]))\n", " print('SE parameters')\n", " for p in self.seJ:\n", " print(p)\n", " sources=self.seJ[p]\n", " for compartment in sources:\n", " targets=sources[compartment]\n", " for t in targets:\n", " print('\\t SE bind {}/{}:{}'.format(compartment,t,targets[t]))\n", " #print('Done')\n", " \n", " def parse(self,file):\n", " \n", " with open(file,'r') as f:\n", " self.mod=json.load(f)\n", " for m in self.mod['compartments']:\n", " self.add_compartment(m)\n", " for s in self.mod['sources']:\n", " #src=mod['sources'][s]\n", " self.add_source(s,self.mod['sources'][s])\n", " self.flows={}\n", " for f in self.mod['flows']:\n", " if f.find(':')<0:\n", " continue\n", " comps=f.split(':')\n", " c0=splitVector(comps[0])\n", " c1=splitVector(comps[1])\n", " for x in c0:\n", " for y in c1:\n", " pairName='{}:{}'.format(x,y)\n", " self.flows[pairName]=f\n", " \n", " parsSE=self.mod['sensitivityCalculation']['parameters']\n", " for b in self.mod['bindings']['diffusion']:\n", " comps=b.split('->')\n", " try:\n", " pc=self.mod['partitionCoefficients'][b][\"value\"]\n", " except KeyError:\n", " pc=1\n", " k1=self.mod['bindings']['diffusion'][b]\n", " self.bind(comps[0],comps[1],k1/pc)\n", " if \"diffusions\" in parsSE:\n", " self.bindDerivative(b,\"diffusionParameter\",comps[0],comps[1],k1,pc,0)\n", " if \"partitionCoefficients\" in parsSE:\n", " if b in self.mod['partitionCoefficients']:\n", " parName='{}:PC'.format(b)\n", " self.bindDerivative(parName,\"partitionCoefficient\",comps[0],comps[1],k1,pc,0)\n", " \n", " for q in self.mod['bindings']['flow']:\n", " comps=q.split('->')\n", " srcs=splitVector(comps[0])\n", " tgts=splitVector(comps[1])\n", " for cs in srcs:\n", " for ct in tgts:\n", " try:\n", " pc=self.mod['partitionCoefficients'][cs]\n", " except KeyError:\n", " pc=1\n", " try:\n", " qName=self.flows['{}:{}'.format(cs,ct)]\n", " except KeyError:\n", " qName=self.flows['{}:{}'.format(ct,cs)]\n", " #convert to seconds\n", " q=self.mod['flows'][qName]\n", " try:\n", " if self.mod['flows']['unit'].split('/')[1]=='min':\n", " q/=60\n", " except KeyError:\n", " pass\n", " self.bind(cs,ct,q/pc,1)\n", " if \"partitionCoefficients\" in parsSE:\n", " if cs in self.mod['partitionCoefficients']:\n", " #associate parameter name in sensitivity table\n", " parName=cs+'PC'\n", " self.bindDerivative(parName,\"partitionCoefficient\",cs,ct,q,pc)\n", " if \"flows\" in parsSE:\n", " self.bindDerivative(qName,\"flow\",cs,ct,q,pc)\n", " if \"volumes\" in parsSE:\n", " self.bindDerivative(\"x\",\"volume\",cs,ct,q,pc)\n", " \n", " #print('bind: {}:(q={},vt={},pc={}):{}'.format(bind,q,vt,pc,q/vt/pc))\n", " \n", " self.build()\n", " \n", " def u(self,t):\n", " ub=[f(t) for f in self.fu]\n", " return numpy.array(ub)\n", " \n", " def fSY(self,y,t):\n", " #M number of sensitivity parameters\n", " #N number of equations\n", " #fSS is MxNxN\n", " \n", " #assume a tabulated solution y(t) at t spaced intervals\n", " \n", " qS=self.fSS.dot(y)\n", " #qS is MxN\n", " #but NxM is expected, so do a transpose\n", " \n", " #for simultaneous calculation, a Nx(M+1) matrix is expected\n", " tS=numpy.zeros((self.n,self.m+1))\n", " #columns from 2..M+1 are the partial derivatives \n", " tS[:,1:]=numpy.transpose(qS)\n", " #first column is the original function\n", " tS[:,0]=self.u(t)\n", " return tS\n", " \n", " def fS(self,t):\n", " #M number of sensitivity parameters\n", " #N number of equations\n", " #fSS is MxNxN\n", " \n", " #assume a tabulated solution y(t) at t spaced intervals\n", " \n", " qS=self.fSS.dot(self.getY(t))\n", " return numpy.transpose(qS)\n", " \n", " def getSEJ(self,parName):\n", " #find the sensitivity (SE) derivative of Jacobi with respect to parameter \n", " try:\n", " return self.seJ[parName]\n", " except KeyError:\n", " self.seJ[parName]={}\n", " return self.seJ[parName]\n", " \n", " def getSEJ_comp(self,parName,compartmentName):\n", " #find equation dictating concentration in compartmentName for jacobi-parameter derivative\n", " seJ=self.getSEJ(parName)\n", " \n", " try:\n", " return seJ[compartmentName]\n", " except KeyError:\n", " seJ[compartmentName]={}\n", " return seJ[compartmentName]\n", " def setY(self,t,y):\n", " self.tck=[None]*self.n\n", " for i in range(self.n):\n", " self.tck[i] = scipy.interpolate.splrep(t, y[:,i], s=0)\n", " \n", " def getY(self,t):\n", " fY=numpy.zeros(self.n)\n", " for i in range(self.n):\n", " fY[i]=scipy.interpolate.splev(t, self.tck[i], der=0)\n", " return fY\n", " \n", "def splitVector(v):\n", " if v.find('(')<0:\n", " return [v]\n", " return v[1:-1].split(',')\n", "\n", "def parseFunction(formula):\n", " if formula['name']=='exponential':\n", " c0=formula['constant']\n", " k=formula['k']\n", " return lambda t:c0*numpy.exp(k*t)\n", " if formula['name']=='constant':\n", " c0=formula['value']\n", " return lambda t:c0\n", " if formula['name']=='Heavyside':\n", " t0=formula['limit']\n", " v=formula['value']\n", " return lambda t:v if t" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "#copy output of Thompson et al\n", "fig, axs = matplotlib.pyplot.subplots(5, 3,figsize=(15,25))\n", "name=['arterial','adipose','brain','heart','kidney','liver','lung','muscle','skin',\n", " 'splanchnic','stomach','testes']\n", "#diazepam\n", "max=[1.5,2.6,3,4,5,2.5,6.8,1.5,1.5,4,4.2,3]\n", "#cotinine\n", "max=[9]*12\n", "\n", "max=[1000*x for x in max]\n", "for i in range(len(name)):\n", " row=i//3\n", " col=i%3\n", " axs[row,col].plot(t/60,sol[:,sys.lut[name[i]]])\n", " axs[row,col].plot(t/60,s1[:,sys.lut[name[i]],0])\n", " axs[row,col].set_title(name[i])\n", " axs[row,col].set_ylim([0,max[i]])\n", " axs[row,col].set_xlim([0,250])\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 422/(9.1/0.15)/60" ] }, { "cell_type": "code", "execution_count": 80, "metadata": {}, "outputs": [], "source": [ "import scipy.interpolate\n", "x = numpy.arange(0, 2*numpy.pi+numpy.pi/4, 2*numpy.pi/8)\n", "Y=numpy.zeros((len(x),2))\n", "Y[:,0] = numpy.sin(x)\n", "Y[:,1] = numpy.sin(x)\n", "\n", "tck = scipy.interpolate.splrep(x, Y[:,0], s=0)\n", "\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[3. 2.]\n", " [2. 4.]\n", " [2. 6.]]\n", "[[3. 2.]\n", " [2. 4.]\n", " [2. 6.]]\n" ] } ], "source": [ "M=numpy.ones((3,2,2))\n", "M[0,0,1]=2\n", "M[1,1,0]=3\n", "M[2,1,1]=5\n", "v=numpy.ones(2)\n", "q=M.dot(v)\n", "q1=q.ravel()\n", "q2=numpy.reshape(q1,q.shape)\n", "print(q)\n", "print(q2)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 4 }