#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
from scipy import signal
#import matplotlib.pyplot as plt
from scipy.special import expit
#import json
import pandas as pd
import ast
"""
# ------------------------------------------------- #
# #
# Time dependent function constructors #
# #
# ------------------------------------------------- #
ToDo:
* Merge build and build_metapopulation
* Simplify build function
"""
[docs]def build(input):
"""Build a function for using it as a dynamical paraeter.
Args:
input (str,dic,func,tuple,list,bool):
Returns:
function(t): cv19gm function for dynamic parameters
"""
if type(input)==str:
#input_dict = json.loads(input)
#print('input dict')
input_dict = ast.literal_eval(input)
if isinstance(input_dict, (int, float, complex)):
def out(t):
return input_dict
setattr(locals()['out'],'constructor',str(input_dict))
return out
elif type(input)==dict:
input_dict = input.copy()
elif callable(input):
out = input
setattr(locals()['out'],'constructor','User defined Function')
return input
elif type(input) == tuple:
return build_add(*input)
elif type(input) == bool:
# For building iterable simulations
return input
else:
if type(input) == list:
input = np.array(input)
def out(t):
return input
setattr(locals()['out'],'constructor',str(input))
return out
#try:
# print("Executing "+input_dict['function'])
#except:
# raise SyntaxError("No function defined")
aux = 'out=' + input_dict['function']+"("
del input_dict['function']
for key, value in input_dict.items():
aux +=key+"="+str(value)+','
aux +=')'
#print(aux)
ldict={}
exec(aux,globals(),ldict)
out = ldict['out']
setattr(locals()['out'],'constructor',str(input))
return locals()['out']
# Build + function addition.
[docs]def build_add(*input):
"""
Creates a function Build a functions and adds t
# Construir funcion que sume multiples funciones
"""
aux = []
for i in input:
aux.append(build(i))
return func_add(*aux)
# Function addition
[docs]def func_add(*functions):
"""
Creates the function that results adding the functions received.
Receives a list where each element is a function that receives 1 argument
"""
initfunctions = []
for i in functions:
initfunctions.append(build(i))
def f(t):
aux = 0
for i in initfunctions:
aux += i(t)
return aux
return f
[docs]def events(values,days,default=0,*functions):
"""Event creator function. Create a time dependent function that returns the values (or functions) given in the
values vector for the intervals specified in the days vector.
Args:
values (list): list with the values for the different intervals
days (list): list of lists of len == 2 with the values for the function on that v.
default (float, optional): value that the function takes in undefined intervals. Defaults to 0
functions: extra functions to add with no interval definition
Returns:
out (function): function created
"""
gw = 20
f = lambda t:0
aux_f = [f]
for i in functions:
aux_f.append(i)
fvalues = []
for i in values:
fvalues.append(build(i))
for i in range(len(values)):
def auxf(t,j=i):
return (fvalues[j])(t)*(expit(gw*(t-days[j][0])) - expit(gw*(t-days[j][1])))
aux_f.append(auxf)
#
# Default value
#
# find overlapping intervals
dayssort = np.array(days)
dayssort = dayssort[np.argsort(dayssort[:,0])]
overlapdays = [dayssort[0,0]]
aux = dayssort[0,1]
for i in range(1,len(dayssort)):
for j in range(2):
if not j and aux < dayssort[i,j]:
overlapdays.append(aux)
overlapdays.append(dayssort[i,j])
aux = dayssort[i,1]
break
elif j and aux < dayssort[i,j]:
aux = dayssort[i,j]
overlapdays.append(aux)
overlapdays = np.reshape(overlapdays,(int(len(overlapdays)/2),2))
# Create default function
for i in range(len(overlapdays)):
def auxf(t,j=i):
return -default*((expit(gw*(t-overlapdays[j][0])) - expit(gw*(t-overlapdays[j][1]))))
aux_f.append(auxf)
aux_f.append(lambda t:default)
# Create final function adding them all
out = func_add(*aux_f)
return out
[docs]def piecewise(values,limits = [-np.infty]):
"""Piecewise creator function. Create a smooth time dependent function that returns the values (or functions) given in the
values vector for the intervals specified through the limits vector. In order to make this function differentiable, we use a sigmoid to have a smooth change between values. This produces that the
value at the limit is equal to the mean between both sides.
f(x) = {v0 for t in ]-inf,limits[0][
v1 for t in ]limits[0],limits[1][
...
vn for t in ]limits[n-1], inf[
}
f(limit[0]) = (v0+v1)/2
Args:
values (list): list with the values for the different intervals
limits (list): list of days where the function takes the next value. len(limits)=len(values)-1
Returns:
out (function): function created
"""
# This make it work for constant values
if not type(values) == list:
return build(values)
gw = 20
aux_f = []
# Array with all the values/functions
fvalues = []
for i in values:
fvalues.append(build(i))
# Initial value
aux_f.append(lambda t: (fvalues[0])(t)*expit(gw*(limits[0]-t)))
for i in range(len(limits)-1):
def auxf(t,j=i): #j=i is needed for building functions through an iteration
return fvalues[j+1](t)*(expit(gw*(t-limits[j])) - expit(gw*(t-limits[j+1])))
aux_f.append(auxf)
# Final value
aux_f.append(lambda t: (fvalues[-1])(t)*(expit(gw*(t-limits[-1]))))
# Create final function adding them all
out = func_add(*aux_f)
return out
# Periodic square function
[docs]def square(min_val=0,max_val=1,period=14,t_init=0,t_end=1000,default=0,initphase=0,duty=0.5):
# phase en fraccion del periodo
def f(t):
return signal.square(t,duty)
# Start with max value
if initphase:
phi = 2*np.pi*t_init/period
# Start with min value
else:
phi = np.pi*(2*t_init/period - 1.1)
def aux(t):
return (expit(10*(t-t_init)) - expit(10*(t-t_end)))*((max_val-min_val)/2*(f(2*np.pi / period * t - phi))+(max_val+min_val)/2) + \
(1-(expit(10*(t-t_init)) - expit(10*(t-t_end))))*default
return aux
# Sine function
[docs]def sine(min_val=0,max_val=1,period=14,t_init=0,t_end=1000,default=0,initphase=0):
# phase en fraccion del periodo
def f(t):
return np.cos(t)
# Start with max value
if initphase:
phi = 2*np.pi*t_init/period
# Start with min value
else:
phi = np.pi*(2*t_init/period - 1)
def aux(t):
return (expit(10*(t-t_init)) - expit(10*(t-t_end)))*((max_val-min_val)/2*(f(2*np.pi / period * t - phi))+(max_val+min_val)/2) + \
(1-(expit(10*(t-t_init)) - expit(10*(t-t_end))))*default
return aux
# Sawtooth function
[docs]def sawtooth(min_val=0,max_val=1,period=14,t_init=0,t_end=1000,default=0,initphase=0,width=1):
# phase en fraccion del periodo
def f(t):
return signal.sawtooth(t,width)
# Start with max value
if initphase:
phi = 2*np.pi*t_init/period
# Start with max value
else:
phi = np.pi*(2*t_init/period - 0.5/period)
def aux(t):
return (expit(10*(t-t_init)) - expit(10*(t-t_end)))*((max_val-min_val)/2*(f(2*np.pi / period * t - phi))+(max_val+min_val)/2) + \
(1-(expit(10*(t-t_init)) - expit(10*(t-t_end))))*default
return aux
# Value transition functions
[docs]def linear_transition(t_init,t_end,initvalue=0,endvalue = 1):
"""linearTransition
Creates a function which performs a linear transition from initvalue to endvalue between t_init and t_end.
Args:
t_init (int): Transition beginningt_init
t_end (int): Transition end
initvalue (int, optional): Initial value. Defaults to 0.
endvalue (int, optional): End value. Defaults to 1.
Returns:
function: function that performs the linear transition
"""
a = np.polyfit([t_init,t_end],[initvalue,endvalue],1)
f = lambda t: np.poly1d(a)(t)
out = lambda t: initvalue*expit(10*(t_init-t)) + f(t)*(expit(10*(t-t_init)) - expit(10*(t-t_end))) + endvalue*expit(10*(t-t_end))
return out
[docs]def quadratic_transition(t_init,t_end,initvalue=0,endvalue = 1,concavity=0):
"""quadraticTransition
Creates a function which performs a quadratic transition from initvalue to endvalue between t_init and t_end.
Args:
t_init (int): Transition beginning
t_end (int): Transition end
initvalue (int, optional): Initial value. Defaults to 0.
endvalue (int, optional): End value. Defaults to 1.
concavity (int, optional): Function's concavity. 0: convex, 1: concave. Defaults to convex.
Returns:
function: function that performs the quadratic transition
"""
# convex increase or concave decrease
if (not concavity and endvalue>=initvalue) or (concavity and initvalue>endvalue):
t_aux = 2*t_init - t_end
v_aux = endvalue
# concave increase or convex decrease
elif (concavity and endvalue>=initvalue) or (not concavity and initvalue>endvalue):
t_aux = 2*t_end - t_init
v_aux = initvalue
a = np.polyfit([t_init,t_end,t_aux],[initvalue,endvalue,v_aux],2)
f = lambda t: np.poly1d(a)(t)
out = lambda t: initvalue*expit(10*(t_init-t)) + f(t)*(expit(10*(t-t_init)) - expit(10*(t-t_end))) + endvalue*expit(10*(t-t_end))
return out
[docs]def sigmoidal_transition(t_init,t_end,initvalue=0,endvalue = 1,gw=8):
"""Sigmoidal Transition
Creates a function which performs a sigmoidal transition from initvalue to endvalue between t_init and t_end.
Args:
t_init (int): Transition beginning
t_end (int): Transition end
initvalue (int, optional): Initial value. Defaults to 0.
endvalue (int, optional): End value. Defaults to 1.
gw (float, optional): Gain weight. Calibrates the "strength" of the sigmoid change
Returns:
function: function that performs the sigmoidal transition
"""
out = lambda t: initvalue + (endvalue-initvalue)*(expit((t-(t_init+t_end)/2)*gw/(t_end-t_init)))
return out
# Value transition functions
# t_init - t_end enviarlo desde el events
[docs]def transition(t_init,t_end,ftype = 'linear', initvalue=0,endvalue = 1, concavity=0, gw=8):
"""Transition
Args:
t_init (int): Transition beginning
t_end (int): Transition end
ftype (str or int, optional): Transition function type. Defaults to 'linear'. 0 or "linear", 1 or "quadratic", 2 or "sigmoidal"
initvalue (int, optional): _description_. Defaults to 0.
endvalue (int, optional): _description_. Defaults to 1.
concavity (int, optional): Quadratic Function's concavity. 0: convex, 1: concave. Defaults to convex.
gw (float, optional): Gain weight. Calibrates the "strength" of the sigmoid change. Defaults to 8.
"""
if ftype == 'linear' or ftype == 0:
a = np.polyfit([t_init,t_end],[initvalue,endvalue],1)
f = lambda t: np.poly1d(a)(t)
out = lambda t: initvalue*expit(10*(t_init-t)) + f(t)*(expit(10*(t-t_init)) - expit(10*(t-t_end))) + endvalue*expit(10*(t-t_end))
elif ftype == 'quadratic' or ftype == 1:
# convex increase or concave decrease
if (not concavity and endvalue>=initvalue) or (concavity and initvalue>endvalue):
t_aux = 2*t_init - t_end
v_aux = endvalue
# concave increase or convex decrease
elif (concavity and endvalue>=initvalue) or (not concavity and initvalue>endvalue):
t_aux = 2*t_end - t_init
v_aux = initvalue
a = np.polyfit([t_init,t_end,t_aux],[initvalue,endvalue,v_aux],2)
f = lambda t: np.poly1d(a)(t)
out = lambda t: initvalue*expit(10*(t_init-t)) + f(t)*(expit(10*(t-t_init)) - expit(10*(t-t_end))) + endvalue*expit(10*(t-t_end))
elif ftype == 'sigmoidal' or ftype == 2:
out = lambda t: initvalue + (endvalue-initvalue)*(expit((t-(t_init+t_end)/2)*gw/(t_end-t_init)))
else:
# raise error here
print('Wrong function type input')
out = None
return out
# Saturation function
[docs]def saturation(upperlimit):
""" Function that builds binary functions which indicates when the sum of the arguments are bigger than the saturation function.
Args:
upperlimit (function or cv19function builder arg): Upper limit function
Returns:
saturationfunction (function): binary function with time multiple arguments that returns 1 when the arguments addition function
Args:
t (float): time value
*args: multiple arguments which are added and then compared with the saturation value at time t
Returns:
int:
0 when the functions given are smaller than the saturation function
1 when they are bigger or equal
"""
gw = 20 # gain weight
upperlimit = build(upperlimit)
def aux(t,*functions):
"""binary function with time multiple arguments that returns 1 when the arguments addition function
Args:
t (float): time value
*args: multiple arguments which are added and then compared with the saturation value at time t
Returns:
int:
0 when the functions given are smaller than the saturation function
1 when they are bigger or equal
"""
f = build_add(*functions)
return(expit(gw*(f(t)-upperlimit(t))))
return aux
[docs]def data_function(data,future):
"""Creates a function that returns the data during its length
Args:
data (list| np.array|pd.Series|pd.DataFrame ): Data
future ([type]): [description]
"""
if isinstance(data, pd.DataFrame):
data = list(data.iloc[0])
elif isinstance(data, pd.Series):
data = list(data)
auxf = build(future)
def aux(t):
if t < len(data):
return data[int(t)]
else:
return auxf(t)
return aux
[docs]def polyfit(values,time = None,degree=4,endvalue_index = -5):
"""
polyfit:
Function data fits real data with a polynom of a given degree, and then projects the future values with it.
values: values to fit
time: time array from data to fit. If no time array given, daily data is assumed
degree: polynom degree
"""
if not time:
time = np.array(range(len(values)))
datamodel = np.poly1d(np.polyfit(time, values, degree))
# transition time from data to fixed value
tchange = time[-1]#[tchange]
endvalue = np.mean(values[endvalue_index:])
f_out=lambda t: datamodel(t)*(1-expit(t-tchange)) + expit(t-tchange)*endvalue
return f_out
[docs]def interpolate_data(data):
"""Interpolates the data in order to have a daily array of data
Args:
data ([type]): [description]
Returns:
[type]: [description]
"""
# Work in progress
aux = data
return aux
# Creo que no es necesaria
[docs]def events_append(values,days,new_val,new_days=None,default=0):
#TODO
# hola
"""_summary_
Args:
values (_type_): _description_
days (_type_): _description_
new_val (_type_): _description_
new_days (_type_, optional): _description_. Defaults to None.
default (int, optional): _description_. Defaults to 0.
Returns:
_type_: _description_
"""
values = values + [new_val]
if new_days:
days = days + [new_days]
out = events(values=values,days = days,default = default)
return out