#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#from logging import raiseExceptions
from distutils.log import error
import matplotlib.pyplot as plt
import numpy as np
import time
# optimization
import pygmo as pg
from numpy import linalg as LA
# cv19gm libraries
from cv19gm.data.cv19data import ImportData as importdata
from cv19gm.cv19sim import CV19SIM
import cv19gm.utils.cv19functions as cv19functions
from cv19gm.utils.cv19errors import *
"""To Do
* Implementar método Alejo:
* Ver si se puede optimizar, demora como media hora =S
* Implementar mi método
* Agregarlo a la función general cv19paramfit
# To Do:
[x] 1. Arreglar lo del cálculo de error para datos discontinuos
[x] 2. Testear error ScaledRMSE vs RMSE
[x] 3. Crear función piecewise con sólo los días de intermedios [t0,t1,t2,t3] en vez de [[t0,t1],[t2,t3],[t4,t5]]
[x] 4. Comprobar que el opti_mu esté bien implementado
[x] 5. Intentar no tener que simular nuevamente con el beta obtenido, ver si se puede obtener la última simulación: can't
[ ] 6. Calcular error global al avanzar con las iteraciones
[ ] 7. Implementar número mínimo de días entre cambios de tendencias
[ ] 8. Implementar iteración beta_fit. Condiciones de salida:
* Error < erro_tol
* Numero de iteraciones
* Disminusión relativa de error entre iteraciones
[ ] 9. Revisar consistencia al establecer las condiciones iniciales de las simulaciones
[ ] 10. Agregar la posibilidad de meter objetos de datos a los fiteos
[ ] 11. Ordenar, limpiar y reducir el código
[ ] 12. Exponer variables de optimización como población y número de generaciones
"""
[docs]class CV19PARAMFIT:
def __init__(
self,
cfg,
method="interval_fit",
I_d_data=None,
t_data=None,
dates_data=None,
tstate=None,
initdate=None,
cv19gmdata=None,
verbose=False,
nintervals=3,
gen=200,
vartofit="I_d",
**kwargs
):
"""CV19PARAMFIT
Data fitting library for cv19gm platform. Manages the data and initializes the optimization method.
The data to fit can be given in three different ways:
* Data array using I_d_data and t_data
* Target state id and initial date for obtaining the data from our servers, usubg tstate and initdate
* cv19gmdata object with the data already loaded
Args:
cfg (str,dict): Simulation configuration file
method (str, optional): Fitting method (add list of available methods). Defaults to 'interval_fit'.
I_d_data (_type_, optional): Data new cases to fit. Defaults to None.
t_data (list or nparray, optional): Data time array. Defaults to None.
dates_data (_type_, optional): Data dates array. Defaults to None.
tstate (_type_, optional): State code to obtain data to fit. Defaults to None.
initdate (_type_, optional): Initial date to fit from data obtained automatically. Defaults to None.
cv19gmdata (_type_, optional): cv19gmdata object with data to fit. Defaults to None.
verbose (bool, optional): Verbose output. Defaults to False.
nintervals (int, optional): Number of intervals to fit. Applies to interval_fit method. Defaults to 3.
gen (int, optional): Number of generation for fitting method. Defaults to 200.
vartofit (str, optional): Variable to fit on simulation. Does nothing for the moment, but it may be used in the future. Defaults to 'I_d'.
"""
self.cfg = cfg
self.gen = gen
self.kwargs = kwargs
# Data management: data source and data load
# Data array
if not type(I_d_data) == type(None):
self.I_d_data = I_d_data
self.t_data = t_data
self.dates_data = dates_data
self.data = None
# State code + inital date: downloads data from remote server
elif not type(tstate) == type(None):
self.data = importdata(tstate=tstate, initdate=initdate)
# Import data for seir - change this for adding more models
print("Importing data from remote server")
self.data.import_data_seir()
self.I_d_data, self.t_data, self.dates_data = (
self.data.I_d_r,
self.data.I_d_r_tr,
self.data.I_d_r_dates,
)
# cv19gmdata object
elif not type(cv19gmdata) == type(None):
self.data = cv19gmdata
self.I_d_data, self.t_data, self.dates_data = (
self.data.I_d_r,
self.data.I_d_r_tr,
self.data.I_d_r_dates,
)
# Missing data to fit
else:
print("Missing data to fit")
return
# raiseExceptions
# Optimization methods
if method == "interval_fit":
self.nintervals = nintervals # number of intervals
# Set bounds
alpha_low_bound = np.repeat(0.01, self.nintervals) # lower bound for alpha
days_interval_low_bound = np.repeat(
0, self.nintervals
) # lower bound for days interval
alpha_up_bound = np.repeat(1.0, self.nintervals) # upper bound alpha
days_interval_up_bound = np.repeat(
90, self.nintervals
) # upper bound for days interval, 90 is the max day between intervals.
low_bound = np.append(alpha_low_bound, days_interval_low_bound)
up_bound = np.append(alpha_up_bound, days_interval_up_bound)
bounds = [low_bound, up_bound]
t_sim = 500 # simulation time in days
# until = 150 # set limit to optimize with real data
# Set individual
# ver si funciona bien la inicialización de I_d para casos sin inputdata
self.opti = INTERVAL_FIT(
I_d_data=self.I_d_data,
t_data=self.t_data,
bounds=bounds,
cfg=cfg,
nintervals=self.nintervals,
inputdata=self.data,
I_d=self.I_d_data[0],
)
# Set Particle Swarm Optimization parameters, gen=200, population=40 -> time: 20 min
self.algo = pg.algorithm(pg.pso(gen=self.gen))
self.algo.set_verbosity(10)
elif method == "other_method":
print("Performing other method =)")
[docs] def evolve(self, npop=40):
self.pop = pg.population(self.opti, npop)
self.pop = self.algo.evolve(self.pop)
self.opti.fitness(self.pop.champion_x)
print(self.pop.champion_f)
[docs] def simulate(self):
if not type(self.dates_data) == type(None):
self.kwargs["initdate"] = self.dates_data[0]
self.sim = CV19SIM(
self.cfg,
beta=self.opti.beta_funct,
inputdata=self.data,
I_d=self.I_d_data[0],
**self.kwargs
)
self.sim.solve()
[docs] def plot(self, xaxis="days"):
"""Plot simulation results with optimized parameters"""
interval_array = [
self.opti.beta_funct["days"][i][1]
for i in range(len(self.opti.beta_funct["days"]))
]
if xaxis == "days":
plt.plot(self.sim.sims[0].t, self.sim.sims[0].I_d, label="Simulation")
plt.scatter(self.t_data, self.I_d_data, label="Data")
plt.xlabel("Days")
for i in np.array(interval_array):
plt.axvline(i, linestyle="dashed", color="grey")
elif xaxis == "dates":
plt.plot(self.sim.sims[0].dates, self.sim.sims[0].I_d, label="Simulation")
plt.scatter(self.dates_data, self.I_d_data, label="Data")
plt.xlabel("Dates")
for i in np.array(interval_array):
plt.axvline(
self.sim.sims[0].dates[int(i)], linestyle="dashed", color="grey"
)
plt.ylabel("New cases")
plt.title("Optimization results")
# plot vertical lines, where days intervals start
[docs] def plot_history(self):
"""Plots optimization history
# log of the optimization, for PSO. log = [Gen, Fevals, gbest, Mean Vel., Mean lbest, Avg. Dist]
# Gen: generation, Fevals:fitness evaluations, gbest: global best,
# Mean Vel.: mean velocity, Mean lbest: mean fitness, Avg. Dist: average distance
"""
uda = self.algo.extract(pg.pso) # uda = user-defined algorithm
log = np.array(uda.get_log())
# plot global best vs generations
plt.plot(log[:, 0], log[:, 2])
plt.xlabel("Generation", size=15)
plt.ylabel("Fitness", size=15)
plt.show()
return
[docs]class BETA_FIT:
"""
Optimizador de beta con Inverse Time Weighted Error
1. Se deben encontrar las condiciones iniciales para todas las variables
2. Elegir qué variable se utilizará en el fitness. Voy a partir con I_d
"""
def __init__(self, I_d, t, cfg, bounds, error="RMSE", alpha=1, **kwargs):
self.I_d = I_d # New daily infected
self.t = t # real time
self.bounds = bounds # Bounds for the variables to optimize
self.cfg = cfg
self.alpha = alpha
self.kwargs = kwargs
if error == "ITWE":
self.error = ITWE
elif error == "RMSE":
self.error = RMSE
self.ITWE_error = 0
self.RMSE_error = 0
self.p2p_error = []
#
# self.mu = mu
[docs] def fitness(self, x):
sim = CV19SIM(self.cfg, beta=x, **self.kwargs)
sim.solve()
self.ITWE_error = ITWE(sim.sims[0].I_d, self.I_d, self.t, alpha=self.alpha)
self.RMSE_error = RMSE(sim.sims[0].I_d, self.I_d, self.t)
self.p2p_error = P2PE(sim.sims[0].I_d, self.I_d, self.t)
return [self.ITWE_error]
[docs] def get_bounds(self): # mandatory function of Pygmo2
return self.bounds
[docs] def set_bounds(self, bounds):
self.bounds = bounds
return self.bounds
[docs] def gradient(self, x):
return pg.estimate_gradient_h(lambda x: self.fitness(x), x)
[docs]class BETAMU_FIT:
"""
Optimizador de beta con Inverse Time Weighted Error
1. Se deben encontrar las condiciones iniciales para todas las variables
2. Elegir qué variable se utilizará en el fitness. Voy a partir con I_d
"""
def __init__(self, cfg, bounds, I_d_data, t_data = None, error="ITWE", rho=1, **kwargs):
self.I_d_data = I_d_data # New cases
self.t_data = list(range(len(I_d_data))) if type(t_data) == type(None) else t_data
self.bounds = bounds # Bounds for the variables to optimize
self.cfg = cfg
self.rho = rho # Decide if keeping this
self.kwargs = kwargs
if error == "ITWE":
self.error = ITWE
elif error == "RMSE":
self.error = RMSE
[docs] def fitness(self, x): # mandatory function of Pygmo2
# We construct the days intervals with the values that are being
# optimized.
sim = CV19SIM(self.cfg, beta=x[0], mu=x[1], I_d = self.I_d_data[0],**self.kwargs)
sim.solve()
err = self.error(sim.sims[0].I_d, self.I_d_data, t_data=self.t_data)
return [err]
[docs] def get_bounds(self): # mandatory function of Pygmo2
return self.bounds
[docs] def set_bounds(self, bounds):
self.bounds = bounds
return self.bounds
[docs] def gradient(self, x):
return pg.estimate_gradient_h(lambda x: self.fitness(x), x)
[docs]class BETA_PIECEWISE_FIT:
"""_summary_
# beta_days includes the last transition date, the objetive of this method is to find the best value for this transition.
"""
def __init__(self,cfg, bounds, I_d_data, t_data = None, beta_values = [], beta_days = [-np.infty], error="RMSE", **kwargs):
self.I_d_data = I_d_data # Data new cases
if type(t_data) == type(None):
t_data = list(range(len(I_d_data)))
self.t_data = t_data # Data time
self.bounds = bounds # Bounds for the variables to optimize [[beta_min],[beta_max]]
self.cfg = cfg # Configuration file for the rest of the model's parameters
self.beta_values = beta_values # if none we start optimizing from the beginning
self.beta_days = beta_days
self.kwargs = kwargs
if error == "ITWE":
self.error = ITWE
elif error == "RMSE":
self.error = RMSE
elif error == "RRMSE":
self.error = RRMSE
else:
raise Exception('Wrong error function')
[docs] def fitness(self, x):
beta = cv19functions.piecewise(values=self.beta_values+[x],limits = self.beta_days)
sim = CV19SIM(self.cfg, beta=beta, I_d = self.I_d_data[0], **self.kwargs)
sim.solve()
self.RMSE_error = RMSE(sim.sims[0].I_d, self.I_d_data, self.t_data)
return [self.RMSE_error]
[docs] def get_bounds(self): # mandatory function of Pygmo2
return self.bounds
[docs] def set_bounds(self, bounds):
self.bounds = bounds
return self.bounds
[docs] def gradient(self, x):
return pg.estimate_gradient_h(lambda x: self.fitness(x), x)
[docs]class INTERVAL_FIT:
"""Data Fit
Developed by A. Bernardin, modified by S. Ropert
"""
def __init__(
self, I_d_data, t_data, bounds, cfg, error="RMSE", nintervals=10, **kwargs
):
self.I_d_data = I_d_data # real infected
self.t_data = t_data # real time
self.bounds = bounds
self.alpha_interval_array = 0 # interval_array
self.beta_funct = ""
self.cfg = cfg
self.kwargs = kwargs
self.nintervals = nintervals
# if error == 'ITWE':
# self.error = ITWE
# elif error == 'RMSE':
# self.error = RMSE
[docs] def fitness(self, x): # mandatory function of Pygmo2
"""
We search a vector that includes values for alpha and for the days intervals.
x: Optimization vector, by default from Pygmo2. Vector of lenght 20, where x[:10] are the alpha values and x[10:] are
the days values for the intervals.
"""
# We construct the days intervals with the values that are being
# optimized.
alpha_interval_array = []
first_element = 0
for i in x[self.nintervals :]:
second_element = first_element + i
alpha_interval_array.append([first_element, second_element])
first_element = second_element
self.alpha_interval_array = alpha_interval_array
beta_funct = {
"function": "events",
"values": [],
"days": [],
} # dictionary that includes betas and values for days intervals
beta_funct["values"] = list(x[: self.nintervals])
beta_funct["days"] = alpha_interval_array
self.beta_funct = beta_funct
# set simulation
sims = CV19SIM(
self.cfg, beta=beta_funct, t_end=self.t_data[-1] + 1, **self.kwargs
) # x[i] takes values between limits define by "bounds"
sims.solve() # run simulation
idx = np.searchsorted(
sims.sims[0].t, self.t_data
) # to adjust data to simulation (data are each three days)
res = LA.norm(
self.I_d_data - sims.sims[0].I_d[idx]
) # Norm between data and simulation. This is the fitness.
return [res]
[docs] def get_bounds(self): # mandatory function of Pygmo2
return self.bounds
[docs] def set_bounds(self, bounds):
self.bounds = bounds
return self.bounds
[docs]class SEQUENTIAL_FIT:
"""
Sequential fit optimization method
1. We aply betamu_fit for fiting the initial parameters using a truncated part of the data
2. We calculate a global error function and check if we are under the error tolerance. If not we continue with the following points
3. Using tendency_change we look for the tendency change day.
4. We optimize with beta_fit from the day found in (2).
5. We iterate points 2, 3 and 4 until we reach the end of the data or we reduce the error below the tolerance
"""
def __init__(self, cfg, I_d_data=None, t_data=None, dates_data=None, tstate=None,
initdate=None, cv19gmdata=None, global_errortol=100, local_errortol=100, global_errfunct=RMSE,local_errfunct=LAE,
intervalsize=10, maxintervals=5, bounds_beta=[0,1], bounds_mu=[0,4], inputdata = None, paramtol=0.05, **kwargs):
self.cfg = cfg
self.I_d_data = I_d_data
self.t_data = t_data
self.dates_data = dates_data
self.tstate = tstate
self.initdate = initdate
self.cv19gmdata = cv19gmdata
self.cfg = cfg
self.inputdata = inputdata
self.kwargs = kwargs
self.intervalsize = intervalsize
self.maxintervals = maxintervals
self.bounds_beta = bounds_beta
self.bounds_mu = bounds_mu
self.mu = 1
self.beta_values = []
self.beta_days = []
self.global_errortol = global_errortol
self.local_errortol = local_errortol
self.stop = False
self.actualidx = intervalsize
self.paramtol = paramtol
# Data management: data source
data_management(self, I_d_data, t_data, dates_data, tstate, initdate, cv19gmdata)
# Choose error functions (later will add the capability of choosing)
self.global_errfunct = cv19errorbuild(global_errfunct)
self.local_errfunct = cv19errorbuild(local_errfunct)
self.betaiter = 0
self.global_error_hist = []
# ------------------------- #
# First step: betamu_fit #
# ------------------------- #
[docs] def betamu_fit(self,actualidx=False, debug = True):
if not actualidx:
actualidx = self.intervalsize
start = time.time()
# Set bounds
lb = [self.bounds_beta[0], self.bounds_mu[0]] # lower bound
ub = [self.bounds_beta[1], self.bounds_mu[1]] # upper bound
bounds = [lb, ub]
# Build beta_mu optimization Problem
opti_mu = BETAMU_FIT(cfg = self.cfg, bounds = bounds, I_d_data = self.I_d_data[:actualidx], t_data = self.t_data[:actualidx], error="RMSE", inputdata = self.inputdata, **self.kwargs)
# Choose algorithm
algo = pg.algorithm(pg.nlopt(solver="bobyqa"))
algo.set_verbosity(10)
# Solve
if debug == True:
print("Solving for beta and mu")
pop = pg.population(opti_mu, size=20)
pop = algo.evolve(pop)
# Extract results
self.beta_values.append(np.around(pop.champion_x[0],4))
self.mu = np.around(pop.champion_x[1],4)
# Calculate Global Error
# Try to avoid simulating again
self.sim = CV19SIM(self.cfg,I_d = self.I_d_data[0],beta=self.beta_values[0],mu=self.mu,inputdata = self.inputdata, **self.kwargs)
self.sim.solve()
self.global_error = self.global_errfunct(self.sim.sims[0].I_d, self.I_d_data, t_data=self.t_data)
self.global_error_hist.append(self.global_error)
end = time.time()
if debug == True:
print('Initial values')
print('Elapsed time:'+str(np.around(end - start,2))+' s')
print('beta: '+str(self.beta_values[0]))
print('mu '+str(self.mu))
print('Global Error: '+str(self.global_error))
# ----------------------------------------------------- #
# Second step: recursing beta fit for short intervals #
# ----------------------------------------------------- #
[docs] def beta_fit(self,actualidx=-1,debug=False):
starttime = time.time()
# Find right beta for the divergence point
bounds = [[self.bounds_beta[0]],[self.bounds_beta[1]]]
opti = BETA_PIECEWISE_FIT(cfg = self.cfg, bounds = bounds, I_d_data = self.I_d_data[:actualidx],
t_data = self.t_data[:actualidx], beta_values=self.beta_values,beta_days=self.beta_days, inputdata = self.inputdata, mu = self.mu, **self.kwargs)
# Choose algorithm
algo = pg.algorithm(pg.nlopt(solver="bobyqa"))
algo.set_verbosity(10)
# Find parameters
if debug:
print("Solving for beta")
pop = pg.population(opti, size=20)
pop = algo.evolve(pop)
self.beta_values.append(np.around(pop.champion_x[0],4))
# Calculate Global error
self.beta = cv19functions.piecewise(values=self.beta_values,limits=self.beta_days)
self.sim = CV19SIM(self.cfg,I_d = self.I_d_data[0],beta=self.beta,mu=self.mu,inputdata=self.inputdata,**self.kwargs)
self.sim.solve()
self.global_error = self.global_errfunct(self.sim.sims[0].I_d, self.I_d_data, t_data=self.t_data)
self.global_error_hist.append(self.global_error)
endtime = time.time()
if debug:
print('Optimization process Finished')
print('Elapsed time:'+str(np.around(endtime - starttime,2))+' s')
print('beta: '+str(self.beta_values))
print('days: '+str(self.beta_days))
print('Global Error: '+str(self.global_error))
self.betaiter+=1
[docs] def optimize(self,debug=False):
# Finding initial values
start = time.time()
self.betamu_fit(debug=debug)
# Stop condition: global error under a tolerance or reaching the end of the data
while self.global_error > self.global_errortol and not self.stop:
# 1. Find divergence point
idx = tendency_change(self.I_d_data, t_data=self.t_data, sim = self.sim, errorfunction = self.local_errfunct, l_err_tol=self.local_errortol,startingday=self.actualidx) - 1
# Choose the last data point to fit in this iteration. Check if we reached the end of the data
if idx+self.intervalsize > len(self.I_d_data):
if debug:
print('Reached the end of data')
self.actualidx = len(self.I_d_data)
self.stop = True
else:
self.actualidx = idx+self.intervalsize
# 2. Find the best fit for this interval
self.beta_days.append(idx)
self.beta_fit(debug=debug,actualidx=self.actualidx)
# 3. Check if the solution represents a transition or it is due to noise.
# If the betas are too similar we find one that fits this extended range altogether
if np.abs(self.beta_values[-1] - self.beta_values[-2])<self.paramtol:
# Delete last idx and 2 last betas
del self.beta_days[-1]
del self.beta_values[-2:]
# Find beta for the extended range
if len(self.beta_days)>0:
self.beta_fit(debug=False,actualidx=self.actualidx)
else:
self.betamu_fit(debug=False,actualidx=self.actualidx)
end = time.time()
print('Optimization process Finished')
print('Elapsed time:'+str(np.around(end - start,2))+' s')
print('beta: '+str(self.beta_values))
print('days: '+str(self.beta_days))
print('mu: '+str(self.mu))
print('Global Error: '+str(self.global_error))
print('Number of iterations: '+str(self.betaiter))
[docs] def plot(self, xaxis="days"):
if xaxis == "days":
plt.plot(self.sim.sims[0].t, self.sim.sims[0].I_d, label="Simulation")
plt.scatter(self.t_data, self.I_d_data, label="Data",color='tab:red')
plt.xlabel("Days")
for i in np.array(self.beta_days):
plt.axvline(i, linestyle="dashed", color="grey")
elif xaxis == "dates":
import matplotlib.dates as mdates
plt.plot(self.sim.sims[0].dates, self.sim.sims[0].I_d, label="Simulation")
plt.scatter(self.dates_data, self.I_d_data, label="Data",color='tab:red')
plt.xlabel("Dates")
for i in np.array(self.beta_days):
plt.axvline(
self.sim.sims[0].dates[int(i)], linestyle="dashed", color="grey"
)
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%d/%m/%Y'))
plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=10))
plt.gcf().autofmt_xdate()
plt.xticks(rotation=45)
plt.ylabel("New cases")
plt.title("Optimization results")
[docs] def plot_step(self,step=False):
"""Plots optimization history
"""
if not step and type(step)==bool:
step = len(self.beta_values)
step = min(step,len(self.beta_values))
plt.scatter(self.t_data, self.I_d_data, label="Data",color='grey',s=20)
if step > 0:
beta_values = self.beta_values[:step]
beta_days = [-np.infty] if step == 0 else self.beta_days[:step]
beta = cv19functions.piecewise(values=beta_values,limits=beta_days)
sim = CV19SIM(self.cfg,I_d = self.I_d_data[0],beta=beta,mu=self.mu,inputdata=self.inputdata,**self.kwargs)
sim.solve()
plt.plot(sim.sims[0].t,sim.sims[0].I_d,label='Step: '+str(step))
for i in beta_days:
plt.axvline(i,linestyle='dashed',color='grey')
plt.xlabel("Days", size=15)
plt.ylabel("New Cases", size=15)
plt.title('Optimization evolution')
plt.legend(loc=0)
plt.show()
return
[docs] def plot_history(self,steps=False):
"""Plots optimization history
"""
if not steps and type(steps)==bool:
steps = len(self.beta_values)-1
steps = min(steps,len(self.beta_values)-1)
plt.scatter(self.t_data, self.I_d_data, label="Data",color='grey',s=20)
beta_values = [self.beta_values[0]]
beta_days = [-np.infty]
for i in range(steps):
beta = cv19functions.piecewise(values=beta_values,limits=beta_days)
sim = CV19SIM(self.cfg,I_d = self.I_d_data[0],beta=beta,mu=self.mu,inputdata=self.inputdata,**self.kwargs)
sim.solve()
plt.plot(sim.sims[0].t,sim.sims[0].I_d,linestyle='dashed',label='Iter: '+str(i))
beta_values.append(self.beta_values[i+1])
if i==0:
beta_days = [self.beta_days[i]]
plt.axvline(self.beta_days[i],linestyle='dashed',color='grey')
else:
beta_days.append(self.beta_days[i])
plt.axvline(self.beta_days[i],linestyle='dashed',color='grey')
beta = cv19functions.piecewise(values=beta_values,limits=beta_days)
sim = CV19SIM(self.cfg,I_d = self.I_d_data[0],beta=beta,mu=self.mu,inputdata=self.inputdata,**self.kwargs)
sim.solve()
plt.plot(sim.sims[0].t,sim.sims[0].I_d,label='Final')
plt.xlabel("Days", size=15)
plt.ylabel("New Cases", size=15)
plt.title('Optimization evolution')
plt.legend(loc=0)
plt.show()
return
# Tendency Change
[docs]def tendency_change(I_d_data, t_data=None, cfg=None, sim = None, l_err_tol=100, errorfunction=LAE, startingday = 0,**kwargs):
if not sim:
sim = CV19SIM(config=cfg,I_d = I_d_data[0],**kwargs)
sim.solve()
# Add options for calculating local error
l_err = errorfunction(sim.sims[0].I_d,I_d_data,t_data) # Local Absolute error
aux = np.where(np.array(l_err)>l_err_tol)[0] # first index over error tolerance
changeindex = aux[0] if len(aux)>0 else len(I_d_data)-1
idx = np.where(t_data>startingday)[0]
idx = idx[0] if len(idx)>0 else len(I_d_data)-1
return max(changeindex,idx)
# Data Management
[docs]def data_management(self, I_d_data, t_data, dates_data, tstate, initdate, cv19gmdata):
# Data array
if not type(I_d_data) == type(None):
self.I_d_data = I_d_data
self.t_data = t_data
self.dates_data = dates_data
self.data = None
# State code + inital date: downloads data from remote server
elif not type(tstate) == type(None):
self.data = importdata(tstate=tstate, initdate=initdate)
# Import data for seir - change this for adding more models
print("Importing data from remote server")
self.data.import_data_seir()
self.I_d_data, self.t_data, self.dates_data = (
self.data.I_d_r,
self.data.I_d_r_tr,
self.data.I_d_r_dates,
)
# cv19gmdata object
elif not type(cv19gmdata) == type(None):
self.data = cv19gmdata
self.I_d_data, self.t_data, self.dates_data = (
self.data.I_d_r,
self.data.I_d_r_tr,
self.data.I_d_r_dates,
)
# Missing data to fit
else:
# Raise exception
raise Exception("Missing data to fit")
[docs]def beta_fit(bounds,cfg,I_d_data,t_data,beta_values=[], beta_days = [-np.infty],actualidx=-1,debug=True,global_errfunct=RMSE,**kwargs):
starttime = time.time()
# Find right beta for the divergence point
bounds = [[bounds[0]],[bounds[1]]]
opti = BETA_PIECEWISE_FIT(cfg = cfg, bounds = bounds, I_d_data = I_d_data[:actualidx],
t_data = t_data[:actualidx], beta_values=beta_values,beta_days=beta_days, **kwargs) # mu and inputdata are given through the kwargs
# Choose algorithm
algo = pg.algorithm(pg.nlopt(solver="bobyqa"))
algo.set_verbosity(10)
# Find parameters
if debug:
print("Solving for beta")
pop = pg.population(opti, size=20)
pop = algo.evolve(pop)
beta_values.append(np.around(pop.champion_x[0],4))
# Calculate Global error
beta = cv19functions.piecewise(values=beta_values,limits=beta_days)
sim = CV19SIM(cfg,I_d = I_d_data[0],beta=beta,**kwargs)
sim.solve()
global_errfunct = cv19errorbuild(global_errfunct)
global_error = global_errfunct(sim.sims[0].I_d, I_d_data, t_data=t_data)
endtime = time.time()
if debug:
print('Optimization process Finished')
print('Elapsed time:'+str(np.around(endtime - starttime,2))+' s')
print('beta: '+str(beta_values))
print('days: '+str(beta_days))
print('Global Error: '+str(global_error))
return beta_values, beta_days, beta, global_error