#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SEIRHVD Model
TODO: Stop vaccinating when there's no people left to vaccinate (or raise an error)
TODO: Improve underreport calculation
"""
import numpy as np
from scipy.integrate import solve_ivp
import pandas as pd
#import toml
from datetime import datetime
from datetime import timedelta
# cv19gm libraries
import utils.cv19functions as cv19functions
import utils.cv19files as cv19files
[docs]class SEIRHVD:
"""
SEIRHVD model object:
Construction:
SEIRHVD(self, config = None)
"""
def __init__(self, config=None, verbose = False,**kwargs):
self.compartmentalmodel = 'SEIRHVD'
self.verbose = verbose
if not config:
print('Missing configuration file, using default')
# Load Parameters
if verbose:
print('Loading configuration file')
cv19files.loadconfig(self,config=config,**kwargs) # Load configuration file
if verbose:
print('Initializing parameters and variables')
self.set_initial_values()
if verbose:
print('Building equations')
self.set_equations()
self.solved = False
if verbose:
print('SEIRHVD object created')
self.verbose = verbose
[docs] def set_initial_values(self):
# Hospital capacity:
if self.verbose:
print('Building hospital capacity model')
# Use data
if type(self.cfg['initialconditions']['H_cap'])==str:
self.H_cap = cv19functions.polyfit(self.data[self.H_cap],time=self.data[self.H_cap+'_t'])
self.H_sat = cv19functions.saturation(self.H_cap)
#Initial Population
# Global values
if hasattr(self, 'popfraction'):
self.populationfraction = self.popfraction
if not hasattr(self,'populationfraction'):
self.populationfraction = 1
self.N0 = self.populationfraction*self.population
# Exposed
if not hasattr(self, 'E'):
self.E = self.mu*self.I
self.E_d=self.mu*self.I_d
self.E_ac=self.mu*self.I_ac
# Exposed vaccinated
if not hasattr(self, 'Ev'):
self.Ev = self.mu*self.Iv
self.Ev_d=self.mu*self.Iv_d
self.Ev_ac=self.mu*self.Iv_ac
# Vaccinated Infected
vacprop = (1-self.vac_eff(0))*self.Sv/(self.N0 - self.Sv - self.E - self.I - self.H - self.D - self.R )
if not self.Iv:
self.Iv = self.I*vacprop
self.Iv_d = self.I_d*vacprop
self.Iv_ac = self.I_ac*vacprop
# Infected
self.Icr = (self.I-self.Iv)*self.pE_Icr(0)
self.Im = (self.I-self.Iv)*self.pE_Im(0)
self.Icr_d = (self.I_d-self.Iv_d)*self.pE_Icr(0)
self.Im_d = (self.I_d-self.Iv_d)*self.pE_Im(0)
self.Icr_ac = (self.I_ac-self.Iv_ac)*self.pE_Icr(0)
self.Im_ac = (self.I_ac-self.Iv_ac)*self.pE_Im(0)
# Initial susceptible population
self.S = self.N0 - self.Sv - self.E - self.Ev - self.I - self.Iv - self.H - self.D - self.R
# External flux functions:
if not hasattr(self,'S_f'):
self.S_f = lambda t:0
if not hasattr(self,'Sv_f'):
self.Sv_f = lambda t:0
if not hasattr(self,'E_f'):
self.E_f = lambda t:0
if not hasattr(self,'Ev_f'):
self.Ev_f = lambda t:0
if not hasattr(self,'Im_f'):
self.Im_f = lambda t:0
if not hasattr(self,'Icr_f'):
self.Icr_f = lambda t:0
if not hasattr(self,'Iv_f'):
self.Iv_f = lambda t:0
if not hasattr(self,'R_f'):
self.R_f = lambda t:0
if not hasattr(self,'H_f'):
self.H_f = lambda t:0
if not hasattr(self,'D_f'):
self.D_f = lambda t:0
[docs] def set_equations(self):
"""
# --------------------------- #
# Diferential Ecuations #
# --------------------------- #
Variables:
S: Susceptibles
Sv: Vaccinated Susceptibles
E: Exposed
I_m: Asymptomatic + mild + severe infected
I_cr: Critical Infected
I_v Vaccinated Infected
Phi: Integrated external flux
S_f: Susceptible external Flux
"""
# --------------------------- #
# Susceptibles #
# --------------------------- #
# 0) dS/dt:
self.dS=lambda t,S,Im,Icr,Iv,R,Phi: - self.alpha(t)*S*(self.beta(t)*(Im+Icr)+self.beta_v(t)*Iv)/(self.N0+Phi) + self.pR_S(t)/self.tR_S(t)*R - self.vac_d(t) + self.S_f(t)
# 1) dS_v/dt:
self.dSv=lambda t,Sv,Im,Icr,Iv,R,Phi: -(1-self.vac_eff(t))*self.alpha(t)*Sv*(self.beta(t)*(Im+Icr)+self.beta_v(t)*Iv)/(self.N0+Phi) + self.vac_d(t) + self.Sv_f(t)
# --------------------------- #
# Exposed #
# --------------------------- #
# 2) dE/dt
self.dE = lambda t,S,E,Im,Icr,Iv,Phi: self.alpha(t)*S*(self.beta(t)*(Im+Icr)+self.beta_v(t)*Iv)/(self.N0+Phi) - E*(self.pE_Im(t)/self.tE_Im(t)+self.pE_Icr(t)/self.tE_Icr(t)) + self.E_f(t)
# 3) dE_d/dt*-
self.dE_d = lambda t,S,E_d,Im,Icr,Iv,Phi: self.alpha(t)*S*(self.beta(t)*(Im+Icr)+self.beta_v(t)*Iv)/(self.N0+Phi) + self.E_f(t) - E_d
# 4) dEv/dt
self.dEv = lambda t,Sv,Ev,Im,Icr,Iv,Phi: (1-self.vac_eff(t))*self.alpha(t)*Sv*(self.beta(t)*(Im+Icr)+self.beta_v(t)*Iv)/(self.N0+Phi) - Ev/self.tEv_Iv(t) + self.Ev_f(t)
# 5) dEv_d/dt
self.dEv_d = lambda t,Sv,Ev_d,Im,Icr,Iv,Phi: (1-self.vac_eff(t))*self.alpha(t)*Sv*(self.beta(t)*(Im+Icr)+self.beta_v(t)*Iv)/(self.N0+Phi) + self.Ev_f(t) - Ev_d
# --------------------------- #
# Infected #
# --------------------------- #
# 6) Mild Infected: Actuve
self.dIm=lambda t,E,Im: self.pE_Im(t)/self.tE_Im(t)*E - 1/self.tIm_R(t)*Im + self.Im_f(t)
# 7) Mild Infected: Daily
self.dIm_d=lambda t,E,Im_d: self.pE_Im(t)/self.tE_Im(t)*E + self.Im_f(t) - Im_d
# 8) Critical Infected: Active
self.dIcr=lambda t,E,Icr: self.pE_Icr(t)/self.tE_Icr(t)*E - 1/self.tIcr_H(t)*Icr + self.Icr_f(t)
# 9) Critical Infected: Daily
self.dIcr_d=lambda t,E,Icr_d: self.pE_Icr(t)/self.tE_Icr(t)*E + self.Icr_f(t) - Icr_d
# 10) Vaccinated Infected: Active
self.dIv=lambda t,Ev,Iv: Ev/self.tEv_Iv(t) - self.pIv_R(t)/self.tIv_R(t)*Iv - self.pIv_H(t)/self.tIv_H(t)*Iv + self.Iv_f(t)
# 11) Vaccinated Infected: Daily
self.dIv_d = lambda t,Ev,Iv_d: Ev/self.tEv_Iv(t) + self.Iv_f(t) - Iv_d
# ---------------------------- #
# Hospitalized #
# ---------------------------- #
# 12) Hospitalized
self.dH=lambda t,Icr,Iv,H: (1-self.H_sat(t,H))*(1/self.tIcr_H(t)*Icr + self.pIv_H(t)/self.tIv_H(t)*Iv) - self.pH_R(t)/self.tH_R(t)*H - self.pH_D(t)/self.tH_D(t)*H + self.H_f(t)
# 13) Hospitalized: Daily
self.dH_d=lambda t,Icr,Iv,H,H_d: (1-self.H_sat(t,H))*(1/self.tIcr_H(t)*Icr + self.pIv_H(t)/self.tIv_H(t)*Iv) + self.H_f(t) - H_d
# ------------------------- #
# Deaths #
# ------------------------- #
# 14) Deaths: total
self.dD=lambda t,Icr,Iv,H: self.H_sat(t,H)*(1/self.tIcr_H(t)*Icr + self.pIv_H(t)/self.tIv_H(t)*Iv) + self.pH_D(t)/self.tH_D(t)*H + self.D_f(t)
# 15) Deaths: Daily
self.dD_d=lambda t,Icr,Iv,H,D_d: self.H_sat(t,H)*(1/self.tIcr_H(t)*Icr + self.pIv_H(t)/self.tIv_H(t)*Iv) + self.pH_D(t)/self.tH_D(t)*H + self.D_f(t) - D_d
# --------------------------- #
# Recovered #
# --------------------------- #
# 16) Total recovered
self.dR=lambda t,Im,Iv,H,R: self.pIv_R(t)/self.tIv_R(t)*Iv + 1/self.tIm_R(t)*Im + self.pH_R(t)/self.tH_R(t)*H - self.pR_S(t)/self.tR_S(t)*R + self.R_f(t)
# 17) Total recovered
self.dR_d=lambda t,Im,Iv,H,R_d: self.pIv_R(t)/self.tIv_R(t)*Iv + 1/self.tIm_R(t)*Im + self.pH_R(t)/self.tH_R(t)*H + self.R_f(t) - R_d
# 18) External Flux:
self.dPhi = lambda t: self.S_f(t) + self.Sv_f(t) + self.E_f(t) + self.Ev_f(t) + self.Im_f(t) + self.Icr_f(t) + self.Iv_f(t) + self.H_f(t) + self.D_f(t) + self.R_f(t)
[docs] def run(self,t0=0,T=None,h=0.01,method='LSODA'):
#print('The use of integrate() is now deprecated. Use solve() instead.')
self.solve(t0=t0,T=T,h=h,method=method)
[docs] def solve(self,t0=0,T=None,h=0.01,method='LSODA'):
#integrator function that star form t0 and finish with T with h as
#timestep. If there aren't inital values in [t0,mu*(self.I_ac)T] function doesn't
#start. Or it's start if class object is initialze.
if T is None:
T = self.tsim
# Check if we already simulated the array
if self.solved:
if self.verbose:
print('Already solved')
return()
self.R_d=0
Phi0=0
self.t=np.arange(t0,T+h,h)
self.initcond = np.array([self.S,self.Sv,self.E,self.E_d,self.Ev,self.Ev_d,self.Im,self.Im_d,self.Icr,self.Icr_d,self.Iv,self.Iv_d,self.H,self.H_d,self.D,self.D_d,self.R,self.R_d,Phi0])
if self.verbose:
print('Solving EDOs')
sol = solve_ivp(self.solver_equations,(t0,T), self.initcond,method=method,t_eval=list(range(t0,T)))
self.sol = sol
self.t=sol.t
self.S=sol.y[0,:]
self.Sv=sol.y[1,:]
self.E=sol.y[2,:]
self.E_d=sol.y[3,:]
self.Ev=sol.y[4,:]
self.Ev_d=sol.y[5,:]
self.Im=sol.y[6,:]
self.Im_d=sol.y[7,:]
self.Icr=sol.y[8,:]
self.Icr_d=sol.y[9,:]
self.Iv=sol.y[10,:]
self.Iv_d=sol.y[11,:]
self.H=sol.y[12,:]
self.H_d=sol.y[13,:]
self.D=sol.y[14,:]
self.D_d=sol.y[15,:]
self.R=sol.y[16,:]
self.R_d=sol.y[17,:]
self.Phi=sol.y[18,:]
# Calculate accumulated variables
self.E_ac = np.cumsum(self.E_d)
self.Ev_ac = np.cumsum(self.Ev_d)
self.Im_ac = np.cumsum(np.append([0],self.Im_d[:-1])) + self.Im_ac
self.Icr_ac = np.cumsum(np.append([0],self.Icr_d[:-1])) + self.Icr_ac
self.Iv_ac = np.cumsum(np.append([0], self.Iv_d[:-1]))+ self.Iv_ac
self.R_ac = np.cumsum(self.R_d)
self.I = self.Im + self.Icr + self.Iv
self.I_d = self.Im_d + self.Icr_d + self.Iv_d
self.I_ac = self.Im_ac + self.Icr_ac + self.Iv_ac
self.underreport()
self.analytics()
self.df_build()
self.solved = True
return
[docs] def solver_equations(self,t,y):
ydot=np.zeros(len(y))
ydot[0]=self.dS(t,y[0],y[6],y[8],y[10],y[16],y[18])
ydot[1]=self.dSv(t,y[1],y[6],y[8],y[10],y[16],y[18])
ydot[2]=self.dE(t,y[0],y[2],y[6],y[8],y[10],y[18])
ydot[3]=self.dE_d(t,y[0],y[3],y[6],y[8],y[10],y[18])
ydot[4]=self.dEv(t,y[1],y[4],y[6],y[8],y[10],y[18])
ydot[5]=self.dEv_d(t,y[1],y[5],y[6],y[8],y[10],y[18])
ydot[6]=self.dIm(t,y[2],y[6])
ydot[7]=self.dIm_d(t,y[2],y[7])
ydot[8]=self.dIcr(t,y[2],y[8])
ydot[9]=self.dIcr_d(t,y[2],y[9])
ydot[10]=self.dIv(t,y[4],y[10])
ydot[11]=self.dIv_d(t,y[4],y[11])
ydot[12]=self.dH(t,y[8],y[10],y[12])
ydot[13]=self.dH_d(t,y[8],y[10],y[12],y[13])
ydot[14]=self.dD(t,y[8],y[10],y[12])
ydot[15]=self.dD_d(t,y[8],y[10],y[12],y[15])
ydot[16]=self.dR(t,y[6],y[10],y[12],y[16])
ydot[17]=self.dR_d(t,y[6],y[10],y[12],y[17])
ydot[18]=self.dPhi(t)
return(ydot)
[docs] def underreport(self):
"""Calculates the detected cases using the underreport factor
"""
# ToDo: Many things to improve here
if False:
self.Im_det = [self.Im[i]*self.pI_det(self.t(i)) for i in range(len(self.t))]
self.Im_d_det = [self.Im_d[i]*self.pI_det(self.t(i)) for i in range(len(self.t))]
self.Im_ac_det = np.cumsum(self.Im_d_det)
self.Im_det = self.Im*self.pI_det
self.Im_d_det = self.Im_d*self.pI_det
self.Im_ac_det = self.Im_ac*self.pI_det
self.Icr_det = self.Icr*self.pI_det
self.Icr_d_det = self.Icr_d*self.pI_det
self.Icr_ac_det = self.Icr_ac*self.pI_det
self.Iv_det = self.Iv*self.pIv_det
self.Iv_d_det = self.Iv_d*self.pIv_det
self.Iv_ac_det = self.Iv_ac*self.pIv_det
self.I_det = self.Im_det + self.Icr_det + self.Iv_det
self.I_d_det = self.Im_d_det + self.Icr_d_det + self.Iv_d_det
self.I_ac_det = self.Im_ac_det + self.Icr_ac_det + self.Iv_ac_det
[docs] def df_build(self):
self.results = pd.DataFrame({'t':self.t,'dates':self.dates})
names = ['S','Sv','E','E_d','Ev','Ev_d','Im','Im_d','Icr','Icr_d','Iv','Iv_d','H','H_d','D','D_d','R','R_d','Phi']
aux = pd.DataFrame(np.transpose(self.sol.y),columns=names).astype(int)
names2 = ['E_ac','Ev_ac','Im_ac','Icr_ac','Iv_ac','R_ac','Im_det','Im_d_det','Im_ac_det','Icr_det','Icr_d_det','Icr_ac_det',
'Iv_det','Iv_d_det','Iv_ac_det','I','I_d','I_ac','I_det','I_d_det','I_ac_det']
vars2 = [self.E_ac,self.Ev_ac,self.Im_ac,self.Icr_ac,self.Iv_ac,self.R_ac,self.Im_det,self.Im_d_det,self.Im_ac_det,
self.Icr_det,self.Icr_d_det,self.Icr_ac_det,self.Iv_det,self.Iv_d_det,self.Iv_ac_det,self.I,self.I_d,self.I_ac,
self.I_det,self.I_d_det,self.I_ac_det]
aux2 = pd.DataFrame(np.transpose(vars2),columns=names2).astype(int)
# Prevalence
self.prevalence = pd.DataFrame(np.transpose([self.prevalence_total,self.prevalence_susc,self.prevalence_det,self.CFR]),columns = ['prevalence_total','prevalence_susc','prevalence_det','CFR'])
# Parameters
nameparams = ['alpha','beta','beta_v','vac_d','vac_eff','pE_Im','tE_Im','pE_Icr','tE_Icr','tEv_Iv','tIm_R','tIcr_H','pIv_R','tIv_R',
'pIv_H','tIv_H','pH_R','tH_R','pH_D','tH_D','pR_S','tR_S',]#'pIcr_det','pIm_det','pIv_de']
alpha_val = [self.alpha(t) for t in self.t]
beta_val = [self.beta(t) for t in self.t]
beta_v_val = [self.beta_v(t) for t in self.t]
vac_d_val = [self.vac_d(t) for t in self.t]
vac_eff_val = [self.vac_eff(t) for t in self.t]
pE_Im_val = [self.pE_Im(t) for t in self.t]
tE_Im_val = [self.tE_Im(t) for t in self.t]
pE_Icr_val = [self.pE_Icr(t) for t in self.t]
tE_Icr_val = [self.tE_Icr(t) for t in self.t]
tEv_Iv_val = [self.tEv_Iv(t) for t in self.t]
tIm_R_val = [self.tIm_R(t) for t in self.t]
tIcr_H_val = [self.tIcr_H(t) for t in self.t]
pIv_R_val = [self.pIv_R(t) for t in self.t]
tIv_R_val = [self.tIv_R(t) for t in self.t]
pIv_H_val = [self.pIv_H(t) for t in self.t]
tIv_H_val = [self.tIv_H(t) for t in self.t]
pH_R_val = [self.pH_R(t) for t in self.t]
tH_R_val = [self.tH_R(t) for t in self.t]
pH_D_val = [self.pH_D(t) for t in self.t]
tH_D_val = [self.tH_D(t) for t in self.t]
pR_S_val = [self.pR_S(t) for t in self.t]
tR_S_val = [self.tR_S(t) for t in self.t]
#pIcr_det_val = [self.pIcr_det(t) for t in self.t]
#pIm_det_val = [self.pIm_det(t) for t in self.t]
#pIv_det_val = [self.pIv_det(t) for t in self.t]
self.params = pd.DataFrame(np.transpose([alpha_val,beta_val,beta_v_val,vac_d_val,vac_eff_val,pE_Im_val,tE_Im_val,
pE_Icr_val,tE_Icr_val,tEv_Iv_val,tIm_R_val,tIcr_H_val,pIv_R_val,tIv_R_val,
pIv_H_val,tIv_H_val,pH_R_val,tH_R_val,pH_D_val,tH_D_val,pR_S_val,tR_S_val,]),columns = nameparams)
#pIcr_det_val,pIm_det_val,pIv_det_val]),columns = nameparams)
self.compartments = pd.concat([self.results,aux,aux2],axis=1)
self.results = pd.concat([self.results,aux,aux2,self.params,self.prevalence],axis=1)
self.resume = pd.DataFrame({'peak':int(self.peak),'peak_t':self.peak_t,'peak_date':self.peak_date},index=[0])
[docs] def analytics(self):
#Cálculo de la fecha del Peak
self.peakindex = np.where(self.I==max(self.I))[0][0]
self.peak = max(self.I)
self.peak_t = self.t[self.peakindex]
if self.initdate:
self.dates = [self.initdate+timedelta(int(self.t[i])) for i in range(len(self.t))]
self.peak_date = self.initdate+timedelta(days=int(round(self.peak_t)))
else:
self.dates = [None for i in range(len(self.t))]
self.peak_date = None
# Prevalence:
self.prevalence_total = self.I_ac/self.population
self.prevalence_susc = np.array([self.I_ac[i]/(self.S[i]+self.E[i]+self.I[i]+self.R[i]) for i in range(len(self.I_ac))])
self.prevalence_det = np.array([self.I_ac_det[i]/(self.S[i]+self.E[i]+self.I_det[i]+self.R[i]) for i in range(len(self.I_ac))])
self.CFR = np.array([self.pH_D(t)*self.pE_Icr(t) for t in self.t])