cModel.py 12 KB


  1. import numpy
  2. import json
  3. import os
  4. import scipy.interpolate
  5. class model:
  6. def __init__(self):
  7. self.compartments={}
  8. self.seJ={}
  9. def add_source(self,compartmentName,formula):
  10. self.compartments[compartmentName]['source']=formula
  11. def add_compartment(self,compartmentName):
  12. self.compartments[compartmentName]={}
  13. self.compartments[compartmentName]['targets']={}
  14. self.compartments[compartmentName]['sensTargets']={}
  15. def getTimeUnit(self):
  16. try:
  17. return self.mod['timeUnit']
  18. except KeyError:
  19. return 's'
  20. def bind(self,sourceCompartment,targetCompartment,k,pc,scaleToVolume=0):
  21. #establish a flow from one compartment to the other
  22. cSrc=self.compartments[sourceCompartment]
  23. cTarg=self.compartments[targetCompartment]
  24. vSPar=self.getVolumePar(sourceCompartment,scaleToVolume)
  25. vTPar=self.getVolumePar(targetCompartment,scaleToVolume)
  26. tu=self.getTimeUnit()
  27. fk=get(tu,k)
  28. fpc=get(tu,pc)
  29. fvs=get(tu,vSPar)
  30. fvt=get(tu,vTPar)
  31. #the source equation (where we subtract the current)
  32. addValue(cSrc['targets'],sourceCompartment,-fk/fpc/fvs)
  33. #the target equation (where we add the current)
  34. addValue(cTarg['targets'],sourceCompartment,fk/fpc/fvt)
  35. def getDerivative(self,variable, sign, qPar, pcPar, vPar):
  36. #for flow based transfer, k=Q/vS/pc,
  37. #and jacobian derivative is -Q/V/pc/pc
  38. #for diffusion dominated transfer, k=k1/pc
  39. tu=self.getTimeUnit()
  40. q=get(tu,qPar)
  41. pc=get(tu,pcPar)
  42. v=get(tu,vPar)
  43. if variable=="partitionCoefficient":
  44. return sign*(-1)*q/v/pc/pc
  45. if variable=="diffusionCoefficient":
  46. return sign/pc/v
  47. if variable=="flow":
  48. return sign/pc/v
  49. if variable=="volume":
  50. return sign*(-1)*q/pc/v/v
  51. def getVolumePar(self,compartment,useVolume=1):
  52. if not useVolume:
  53. return {"value":1}
  54. try:
  55. parVName=self.mod["volumes"][compartment]
  56. parV=self.mod["parameters"][parVName]
  57. except KeyError:
  58. return {"value":1}
  59. parV["name"]=parVName
  60. return parV
  61. def bindDerivative(self,parameterName,variable,src,target,q,pc,useVolume=1):
  62. parName=parameterName
  63. srcV=self.getVolumePar(src,useVolume)
  64. doAdd=(variable!="volume" or calculateDerivative(srcV))
  65. if doAdd:
  66. #the source equation (where we subtract the current)
  67. if variable=="volume":
  68. parName=srcV["name"]
  69. csrc=self.getSEJ_comp(parName,src)
  70. addValue(csrc,src,self.getDerivative(variable,-1,q,pc,srcV))
  71. targetV=self.getVolumePar(target,useVolume)
  72. doAdd=(variable!="volume" or calculateDerivative(targetV))
  73. #the target equation (where we add the current)
  74. if doAdd:
  75. if variable=="volume":
  76. parName=targetV["name"]
  77. ctarget=self.getSEJ_comp(parName,target)
  78. addValue(ctarget,src,self.getDerivative(variable,+1,q,pc,targetV))
  79. def build(self):
  80. comps=self.compartments
  81. self.n=len(comps)
  82. self.M=numpy.zeros((self.n,self.n))
  83. self.fu=[lambda t:0]*self.n
  84. self.lut={c:i for (i,c) in zip(range(self.n),comps.keys())}
  85. for c in comps:
  86. comp=comps[c]
  87. if 'source' in comp:
  88. self.fu[self.lut[c]]=parseFunction(comp['source'])
  89. for t in comp['targets']:
  90. self.M[self.lut[c],self.lut[t]]=comp['targets'][t]
  91. #build SE part
  92. self.m=len(self.seJ)
  93. self.lutSE={c:i for (i,c) in zip(range(self.m),self.seJ.keys())}
  94. #MxNxN matrix
  95. self.fSS=numpy.zeros((self.m,self.n,self.n))
  96. for par in self.seJ:
  97. sources=self.seJ[par]
  98. for compartment in sources:
  99. targets=sources[compartment]
  100. for t in targets:
  101. k=self.lutSE[par]
  102. i=self.lut[compartment]
  103. j=self.lut[t]
  104. self.fSS[k,i,j]=targets[t]
  105. def inspect(self):
  106. comps=self.compartments
  107. pars=self.mod['parameters']
  108. tu=self.getTimeUnit()
  109. print('Time unit: {}'.format(tu))
  110. print('Compartments')
  111. for c in comps:
  112. print('{}/{}:'.format(c,self.lut[c]))
  113. comp=comps[c]
  114. if 'source' in comp:
  115. print('\tsource\n\t\t{}'.format(comp['source']))
  116. print('\ttargets')
  117. for t in comp['targets']:
  118. print('\t\t{}[{},{}]: {}'.format(t,self.lut[c],self.lut[t],\
  119. comp['targets'][t]))
  120. print('Flows')
  121. for f in self.flows:
  122. fName=self.flows[f]
  123. fParName=self.mod['flows'][fName]
  124. fPar=pars[fParName]
  125. print('\t{}[{}]:{} [{}]'.format(f,fName,fParName,get(tu,fPar)))
  126. print('Volumes')
  127. for v in self.mod['volumes']:
  128. vParName=self.mod['volumes'][v]
  129. vPar=pars[vParName]
  130. print('\t{}:{} [{}]'.format(v,vParName,get(tu,vPar)))
  131. print('Partition coefficients')
  132. for pc in self.mod['partitionCoefficients']:
  133. pcParName=self.mod['partitionCoefficients'][pc]
  134. pcPar=pars[pcParName]
  135. print('\t{}:{} [{}]'.format(pc,pcParName,get(tu,pcPar)))
  136. print('SE parameters')
  137. for p in self.seJ:
  138. print(p)
  139. sources=self.seJ[p]
  140. for compartment in sources:
  141. targets=sources[compartment]
  142. for t in targets:
  143. print('\t SE bind {}/{}:{}'.format(compartment,t,targets[t]))
  144. def parse(self,file):
  145. with open(file,'r') as f:
  146. self.mod=json.load(f)
  147. for m in self.mod['compartments']:
  148. self.add_compartment(m)
  149. for s in self.mod['sources']:
  150. #src=mod['sources'][s]
  151. self.add_source(s,self.mod['sources'][s])
  152. self.flows={}
  153. print('timeUnit: {}'.format(self.getTimeUnit()))
  154. pars=self.mod['parameters']
  155. for f in self.mod['flows']:
  156. #skip comments
  157. if f.find(':')<0:
  158. continue
  159. comps=f.split(':')
  160. c0=splitVector(comps[0])
  161. c1=splitVector(comps[1])
  162. for x in c0:
  163. for y in c1:
  164. pairName='{}:{}'.format(x,y)
  165. self.flows[pairName]=f
  166. for b in self.mod['bindings']['diffusion']:
  167. #whether to scale transfer constants to organ volume
  168. #default is true, but changing here will assume no scaling
  169. useVolume=1
  170. comps=b.split('->')
  171. try:
  172. pcParName=self.mod['partitionCoefficients'][b]
  173. pcPar=pars[pcParName]
  174. except KeyError:
  175. pcPar={"value":1}
  176. kParName=self.mod['bindings']['diffusion'][b]
  177. kPar=pars[kParName]
  178. self.bind(comps[0],comps[1],kPar,pcPar,useVolume)
  179. #add derivatives calculateDerivative returns true
  180. if calculateDerivative(kPar):
  181. self.bindDerivative(kParName,"diffusionParameter",\
  182. comps[0],comps[1],kPar,pcPar,useVolume)
  183. if calculateDerivative(pcPar):
  184. self.bindDerivative(pcParName,"partitionCoefficient",\
  185. comps[0],comps[1],kPar,pcPar,useVolume)
  186. for q in self.mod['bindings']['flow']:
  187. comps=q.split('->')
  188. srcs=splitVector(comps[0])
  189. tgts=splitVector(comps[1])
  190. for cs in srcs:
  191. for ct in tgts:
  192. #get partition coefficient
  193. try:
  194. pcParName=self.mod['partitionCoefficients'][cs]
  195. pcPar=pars[pcParName]
  196. except KeyError:
  197. pcPar={"value":1}
  198. #get flow (direction could be reversed)
  199. try:
  200. qName=self.flows['{}:{}'.format(cs,ct)]
  201. except KeyError:
  202. qName=self.flows['{}:{}'.format(ct,cs)]
  203. flowParName=self.mod['flows'][qName]
  204. flowPar=pars[flowParName]
  205. self.bind(cs,ct,flowPar,pcPar,1)
  206. if calculateDerivative(pcPar):
  207. self.bindDerivative(pcParName,"partitionCoefficient",\
  208. cs,ct,flowPar,pcPar)
  209. if calculateDerivative(flowPar):
  210. self.bindDerivative(flowParName,"flow",\
  211. cs,ct,flowPar,pcPar)
  212. self.bindDerivative("x","volume",cs,ct,flowPar,pcPar)
  213. self.build()
  214. def u(self,t):
  215. ub=[f(t) for f in self.fu]
  216. return numpy.array(ub)
  217. def fSY(self,y,t):
  218. #M number of sensitivity parameters
  219. #N number of equations
  220. #fSS is MxNxN
  221. #assume a tabulated solution y(t) at t spaced intervals
  222. qS=self.fSS.dot(y)
  223. #qS is MxN
  224. #but NxM is expected, so do a transpose
  225. #for simultaneous calculation, a Nx(M+1) matrix is expected
  226. tS=numpy.zeros((self.n,self.m+1))
  227. #columns from 2..M+1 are the partial derivatives
  228. tS[:,1:]=numpy.transpose(qS)
  229. #first column is the original function
  230. tS[:,0]=self.u(t)
  231. return tS
  232. def fS(self,t):
  233. #M number of sensitivity parameters
  234. #N number of equations
  235. #fSS is MxNxN
  236. #assume a tabulated solution y(t) at t spaced intervals
  237. qS=self.fSS.dot(self.getY(t))
  238. return numpy.transpose(qS)
  239. def getSEJ(self,parName):
  240. #find the sensitivity (SE) derivative of Jacobi with
  241. #respect to parameter
  242. try:
  243. return self.seJ[parName]
  244. except KeyError:
  245. self.seJ[parName]={}
  246. return self.seJ[parName]
  247. def getSEJ_comp(self,parName,compartmentName):
  248. #find equation dictating concentration in compartmentName
  249. #for jacobi-parameter derivative
  250. seJ=self.getSEJ(parName)
  251. try:
  252. return seJ[compartmentName]
  253. except KeyError:
  254. seJ[compartmentName]={}
  255. return seJ[compartmentName]
  256. def setY(self,t,y):
  257. self.tck=[None]*self.n
  258. for i in range(self.n):
  259. self.tck[i] = scipy.interpolate.splrep(t, y[:,i], s=0)
  260. def getY(self,t):
  261. fY=numpy.zeros(self.n)
  262. for i in range(self.n):
  263. fY[i]=scipy.interpolate.splev(t, self.tck[i], der=0)
  264. return fY
  265. def calculateUncertainty(self,sol,se):
  266. s2out=numpy.zeros(sol.shape)
  267. pars=self.mod['parameters']
  268. for parName in pars:
  269. par=pars[parName]
  270. if not calculateDerivative(par):
  271. continue
  272. v=par["value"]
  273. if par['dist']=='lognormal':
  274. #this is sigma^2_lnx
  275. sln2=numpy.log(par["cv"]*par["cv"]+1)
  276. #have to multiplied by value to get the derivative
  277. #with respect to lnx
  278. s2=sln2*v*v
  279. else:
  280. #for Gaussian, cv is sigma/value; get sigma by value multiplication
  281. s2=par["cv"]*par["cv"]*v*v
  282. j=self.lutSE[parName]
  283. fse=se[:,:,j]
  284. print('Calculating for {}/{}:{}'.format(parName,j,fse.shape))
  285. #se2 is nt x n
  286. se2=numpy.multiply(fse,fse)
  287. s2out+=se2*s2
  288. return numpy.sqrt(s2out)
  289. def splitVector(v):
  290. if v.find('(')<0:
  291. return [v]
  292. return v[1:-1].split(',')
  293. def parseFunction(formula):
  294. if formula['name']=='exponential':
  295. c0=formula['constant']
  296. k=formula['k']
  297. return lambda t:c0*numpy.exp(k*t)
  298. if formula['name']=='constant':
  299. c0=formula['value']
  300. return lambda t:c0
  301. if formula['name']=='Heavyside':
  302. t0=formula['limit']
  303. v=formula['value']
  304. return lambda t:v if t<t0 else 0
  305. return lambda t:1
  306. def addValue(qdict,compName,v):
  307. #add real number v to attribute compName of dictionary qdict, check if compName exists and handle the potential error
  308. try:
  309. qdict[compName]+=v
  310. except KeyError:
  311. qdict[compName]=v
  312. def get(timeUnit,par):
  313. v=par["value"]
  314. #convert to seconds
  315. try:
  316. parUnits=par['unit'].split('/')
  317. except (KeyError,IndexError):
  318. #no unit given
  319. return v
  320. try:
  321. if parUnits[1]==timeUnit:
  322. return v
  323. except IndexError:
  324. #no / in unit name
  325. return v
  326. if parUnits[1]=='min' and timeUnit=='s':
  327. return v/60
  328. if parUnits[1]=='s' and timeUnit=='min':
  329. return 60*v
  330. #no idea what to do
  331. return v
  332. def calculateDerivative(par):
  333. #add derivatives if dist(short for distribution) is specified
  334. return "dist" in par