cModel.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. import numpy
  2. import json
  3. import os
  4. import scipy.interpolate
  5. class model:
  6. def __init__(self):
  7. self.compartments={}
  8. self.seJ={}
  9. def add_source(self,compartmentName,formula):
  10. self.compartments[compartmentName]['source']=formula
  11. def add_compartment(self,compartmentName):
  12. self.compartments[compartmentName]={}
  13. self.compartments[compartmentName]['targets']={}
  14. self.compartments[compartmentName]['sensTargets']={}
  15. def bind(self,sourceCompartment,targetCompartment,k,pc,scaleToVolume=0):
  16. #establish a flow from one compartment to the other
  17. cSrc=self.compartments[sourceCompartment]
  18. cTarg=self.compartments[targetCompartment]
  19. vSPar=self.getVolumePar(sourceCompartment,scaleToVolume)
  20. vTPar=self.getVolumePar(targetCompartment,scaleToVolume)
  21. #the source equation (where we subtract the current)
  22. addValue(cSrc['targets'],sourceCompartment,-get(k)/get(pc)/get(vSPar))
  23. #the target equation (where we add the current)
  24. addValue(cTarg['targets'],sourceCompartment,get(k)/get(pc)/get(vTPar))
  25. def getDerivative(self,variable, sign, qPar, pcPar, vPar):
  26. #for flow based transfer, k=Q/vS/pc,
  27. #and jacobian derivative is -Q/V/pc/pc
  28. #for diffusion dominated transfer, k=k1/pc
  29. q=get(qPar)
  30. pc=get(pcPar)
  31. v=get(vPar)
  32. if variable=="partitionCoefficient":
  33. return sign*(-1)*q/v/pc/pc
  34. if variable=="diffusionCoefficient":
  35. return sign/pc/v
  36. if variable=="flow":
  37. return sign/pc/v
  38. if variable=="volume":
  39. return sign*(-1)*q/pc/v/v
  40. def getVolumePar(self,compartment,useVolume=1):
  41. if not useVolume:
  42. return {"value":1}
  43. try:
  44. parVName=self.mod["volumes"][compartment]
  45. parV=self.mod["parameters"][parVName]
  46. except KeyError:
  47. return {"value":1}
  48. parV["name"]=parVName
  49. return parV
  50. def bindDerivative(self,parameterName,variable,src,target,q,pc,useVolume=1):
  51. parName=parameterName
  52. srcV=self.getVolumePar(src,useVolume)
  53. doAdd=(variable!="volume" or calculateDerivative(srcV))
  54. if doAdd:
  55. #the source equation (where we subtract the current)
  56. if variable=="volume":
  57. parName=srcV["name"]
  58. csrc=self.getSEJ_comp(parName,src)
  59. addValue(csrc,src,self.getDerivative(variable,-1,q,pc,srcV))
  60. targetV=self.getVolumePar(target,useVolume)
  61. doAdd=(variable!="volume" or calculateDerivative(targetV))
  62. #the target equation (where we add the current)
  63. if doAdd:
  64. if variable=="volume":
  65. parName=targetV["name"]
  66. ctarget=self.getSEJ_comp(parName,target)
  67. addValue(ctarget,src,self.getDerivative(variable,+1,q,pc,targetV))
  68. def build(self):
  69. comps=self.compartments
  70. self.n=len(comps)
  71. self.M=numpy.zeros((self.n,self.n))
  72. self.fu=[lambda t:0]*self.n
  73. self.lut={c:i for (i,c) in zip(range(self.n),comps.keys())}
  74. for c in comps:
  75. comp=comps[c]
  76. if 'source' in comp:
  77. self.fu[self.lut[c]]=parseFunction(comp['source'])
  78. for t in comp['targets']:
  79. self.M[self.lut[c],self.lut[t]]=comp['targets'][t]
  80. #build SE part
  81. self.m=len(self.seJ)
  82. self.lutSE={c:i for (i,c) in zip(range(self.m),self.seJ.keys())}
  83. #MxNxN matrix
  84. self.fSS=numpy.zeros((self.m,self.n,self.n))
  85. for par in self.seJ:
  86. sources=self.seJ[par]
  87. for compartment in sources:
  88. targets=sources[compartment]
  89. for t in targets:
  90. k=self.lutSE[par]
  91. i=self.lut[compartment]
  92. j=self.lut[t]
  93. self.fSS[k,i,j]=targets[t]
  94. def inspect(self):
  95. comps=self.compartments
  96. pars=self.mod['parameters']
  97. print('Compartments')
  98. for c in comps:
  99. print('{}/{}:'.format(c,self.lut[c]))
  100. comp=comps[c]
  101. if 'source' in comp:
  102. print('\tsource\n\t\t{}'.format(comp['source']))
  103. print('\ttargets')
  104. for t in comp['targets']:
  105. print('\t\t{}[{},{}]: {}'.format(t,self.lut[c],self.lut[t],\
  106. comp['targets'][t]))
  107. print('Flows')
  108. for f in self.flows:
  109. fName=self.flows[f]
  110. fParName=self.mod['flows'][fName]
  111. fPar=pars[fParName]
  112. print('\t{}[{}]:{} [{}]'.format(f,fName,fParName,get(fPar)))
  113. print('Volumes')
  114. for v in self.mod['volumes']:
  115. vParName=self.mod['volumes'][v]
  116. vPar=pars[vParName]
  117. print('\t{}:{} [{}]'.format(v,vParName,get(vPar)))
  118. print('Partition coefficients')
  119. for pc in self.mod['partitionCoefficients']:
  120. pcParName=self.mod['partitionCoefficients'][pc]
  121. pcPar=pars[pcParName]
  122. print('\t{}:{} [{}]'.format(pc,pcParName,get(pcPar)))
  123. print('SE parameters')
  124. for p in self.seJ:
  125. print(p)
  126. sources=self.seJ[p]
  127. for compartment in sources:
  128. targets=sources[compartment]
  129. for t in targets:
  130. print('\t SE bind {}/{}:{}'.format(compartment,t,targets[t]))
  131. def parse(self,file):
  132. with open(file,'r') as f:
  133. self.mod=json.load(f)
  134. for m in self.mod['compartments']:
  135. self.add_compartment(m)
  136. for s in self.mod['sources']:
  137. #src=mod['sources'][s]
  138. self.add_source(s,self.mod['sources'][s])
  139. self.flows={}
  140. pars=self.mod['parameters']
  141. for f in self.mod['flows']:
  142. #skip comments
  143. if f.find(':')<0:
  144. continue
  145. comps=f.split(':')
  146. c0=splitVector(comps[0])
  147. c1=splitVector(comps[1])
  148. for x in c0:
  149. for y in c1:
  150. pairName='{}:{}'.format(x,y)
  151. self.flows[pairName]=f
  152. for b in self.mod['bindings']['diffusion']:
  153. #whether to scale transfer constants to organ volume
  154. #default is true, but changing here will assume no scaling
  155. useVolume=1
  156. comps=b.split('->')
  157. try:
  158. pcParName=self.mod['partitionCoefficients'][b]
  159. pcPar=pars[pcParName]
  160. except KeyError:
  161. pcPar={"value":1}
  162. kParName=self.mod['bindings']['diffusion'][b]
  163. kPar=pars[kParName]
  164. self.bind(comps[0],comps[1],kPar,pcPar,useVolume)
  165. #add derivatives calculateDerivative returns true
  166. if calculateDerivative(kPar):
  167. self.bindDerivative(kParName,"diffusionParameter",\
  168. comps[0],comps[1],kPar,pcPar,useVolume)
  169. if calculateDerivative(pcPar):
  170. self.bindDerivative(pcParName,"partitionCoefficient",\
  171. comps[0],comps[1],kPar,pcPar,useVolume)
  172. for q in self.mod['bindings']['flow']:
  173. comps=q.split('->')
  174. srcs=splitVector(comps[0])
  175. tgts=splitVector(comps[1])
  176. for cs in srcs:
  177. for ct in tgts:
  178. #get partition coefficient
  179. try:
  180. pcParName=self.mod['partitionCoefficients'][cs]
  181. pcPar=pars[pcParName]
  182. except KeyError:
  183. pcPar={"value":1}
  184. #get flow (direction could be reversed)
  185. try:
  186. qName=self.flows['{}:{}'.format(cs,ct)]
  187. except KeyError:
  188. qName=self.flows['{}:{}'.format(ct,cs)]
  189. flowParName=self.mod['flows'][qName]
  190. flowPar=pars[flowParName]
  191. self.bind(cs,ct,flowPar,pcPar,1)
  192. if calculateDerivative(pcPar):
  193. self.bindDerivative(pcParName,"partitionCoefficient",\
  194. cs,ct,flowPar,pcPar)
  195. if calculateDerivative(flowPar):
  196. self.bindDerivative(flowParName,"flow",\
  197. cs,ct,flowPar,pcPar)
  198. self.bindDerivative("x","volume",cs,ct,flowPar,pcPar)
  199. self.build()
  200. def u(self,t):
  201. ub=[f(t) for f in self.fu]
  202. return numpy.array(ub)
  203. def fSY(self,y,t):
  204. #M number of sensitivity parameters
  205. #N number of equations
  206. #fSS is MxNxN
  207. #assume a tabulated solution y(t) at t spaced intervals
  208. qS=self.fSS.dot(y)
  209. #qS is MxN
  210. #but NxM is expected, so do a transpose
  211. #for simultaneous calculation, a Nx(M+1) matrix is expected
  212. tS=numpy.zeros((self.n,self.m+1))
  213. #columns from 2..M+1 are the partial derivatives
  214. tS[:,1:]=numpy.transpose(qS)
  215. #first column is the original function
  216. tS[:,0]=self.u(t)
  217. return tS
  218. def fS(self,t):
  219. #M number of sensitivity parameters
  220. #N number of equations
  221. #fSS is MxNxN
  222. #assume a tabulated solution y(t) at t spaced intervals
  223. qS=self.fSS.dot(self.getY(t))
  224. return numpy.transpose(qS)
  225. def getSEJ(self,parName):
  226. #find the sensitivity (SE) derivative of Jacobi with
  227. #respect to parameter
  228. try:
  229. return self.seJ[parName]
  230. except KeyError:
  231. self.seJ[parName]={}
  232. return self.seJ[parName]
  233. def getSEJ_comp(self,parName,compartmentName):
  234. #find equation dictating concentration in compartmentName
  235. #for jacobi-parameter derivative
  236. seJ=self.getSEJ(parName)
  237. try:
  238. return seJ[compartmentName]
  239. except KeyError:
  240. seJ[compartmentName]={}
  241. return seJ[compartmentName]
  242. def setY(self,t,y):
  243. self.tck=[None]*self.n
  244. for i in range(self.n):
  245. self.tck[i] = scipy.interpolate.splrep(t, y[:,i], s=0)
  246. def getY(self,t):
  247. fY=numpy.zeros(self.n)
  248. for i in range(self.n):
  249. fY[i]=scipy.interpolate.splev(t, self.tck[i], der=0)
  250. return fY
  251. def calculateUncertainty(self,sol,se):
  252. s2out=numpy.zeros(sol.shape)
  253. pars=self.mod['parameters']
  254. for parName in pars:
  255. par=pars[parName]
  256. if not calculateDerivative(par):
  257. continue
  258. v=par["value"]
  259. if par['dist']=='lognormal':
  260. #this is sigma^2_lnx
  261. sln2=numpy.log(par["cv"]*par["cv"]+1)
  262. #have to multiplied by value to get the derivative
  263. #with respect to lnx
  264. s2=sln2*v*v
  265. else:
  266. #for Gaussian, cv is sigma/value; get sigma by value multiplication
  267. s2=par["cv"]*par["cv"]*v*v
  268. j=self.lutSE[parName]
  269. fse=se[:,:,j]
  270. print('Calculating for {}/{}:{}'.format(parName,j,fse.shape))
  271. #se2 is nt x n
  272. se2=numpy.multiply(fse,fse)
  273. s2out+=se2*s2
  274. return numpy.sqrt(s2out)
  275. def splitVector(v):
  276. if v.find('(')<0:
  277. return [v]
  278. return v[1:-1].split(',')
  279. def parseFunction(formula):
  280. if formula['name']=='exponential':
  281. c0=formula['constant']
  282. k=formula['k']
  283. return lambda t:c0*numpy.exp(k*t)
  284. if formula['name']=='constant':
  285. c0=formula['value']
  286. return lambda t:c0
  287. if formula['name']=='Heavyside':
  288. t0=formula['limit']
  289. v=formula['value']
  290. return lambda t:v if t<t0 else 0
  291. return lambda t:1
  292. def addValue(qdict,compName,v):
  293. #add real number v to attribute compName of dictionary qdict, check if compName exists and handle the potential error
  294. try:
  295. qdict[compName]+=v
  296. except KeyError:
  297. qdict[compName]=v
  298. def get(par):
  299. v=par["value"]
  300. #convert to seconds
  301. try:
  302. if par['unit'].split('/')[1]=='min':
  303. return v/60
  304. except (KeyError,IndexError):
  305. pass
  306. return v
  307. def calculateDerivative(par):
  308. #add derivatives if dist(short for distribution) is specified
  309. return "dist" in par