import re
import warnings
import tempfile
import numpy as np
import pandas as pd
from scipy import integrate, optimize
try:
import igraph
except ImportError:
igraph = None
try:
import sympy
except ImportError:
sympy = None
try:
import network2tikz
except ImportError:
network2tikz = None
try:
from pdf2image import convert_from_path as pdf2image
except ImportError:
pdf2image = None
from .common import is_notebook
# Special tokens redered with a LaTeX macro
_latex_symbols = ['alpha',
'beta',
'gamma',
'Gamma',
'delta',
'Delta',
'epsilon',
'varepsilon',
'zeta',
'eta',
'theta',
'vartheta',
'Theta',
'iota',
'kappa',
'lambda',
'Lambda',
'mu',
'nu',
'xi',
'Xi',
'pi',
'Pi',
'rho',
'varrho',
'sigma',
'Sigma',
'tau',
'upsilon',
'Upsilon',
'phi',
'varphi',
'Phi',
'chi',
'psi',
'Psi',
'omega',
'Omega']
_wolfram_symbols = {"alpha": "\[Alpha]",
"beta": r"\[Beta]",
"gamma": r"\[Gamma]",
"delta": r"\[Delta]",
"epsilon": r"\[Epsilon]",
"zeta": r"\[Zeta]",
"eta": r"\[Eta]",
"theta": r"\[Theta]",
"iota": r"\[Iota]",
"kappa": r"\[Kappa]",
"lambda": r"\[Lambda]",
"mu": r"\[Mu]",
"nu": r"\[Nu]",
"xi": r"\[Xi]",
"omicron": r"\[Omicron]",
"rho": r"\[Rho]",
"sigma": r"\[Sigma]",
"tau": r"\[Tau]",
"upsilon": r"\[Upsilon]",
"phi": r"\[Phi]",
"chi": r"\[Chi]",
"psi": r"\[Psi]",
"omega": r"\[Omega]",
"Alpha": "\[CapitalAlpha]",
"Beta": r"\[CapitalBeta]",
"Gamma": r"\[CapitalGamma]",
"Delta": r"\[CapitalDelta]",
"Epsilon": r"\[CapitalEpsilon]",
"Zeta": r"\[CapitalZeta]",
"Eta": r"\[CapitalEta]",
"Theta": r"\[CapitalTheta]",
"Iota": r"\[CapitalIota]",
"Kappa": r"\[CapitalKappa]",
"Lambda": r"\[CapitalLambda]",
"Mu": r"\[CapitalMu]",
"Nu": r"\[CapitalNu]",
"Xi": r"\[CapitalXi]",
"Omicron": r"\[CapitalOmicron]",
"Rho": r"\[CapitalRho]",
"Sigma": r"\[CapitalSigma]",
"Tau": r"\[CapitalTau]",
"Upsilon": r"\[CapitalUpsilon]",
"Phi": r"\[CapitalPhi]",
"Chi": r"\[CapitalChi]",
"Psi": r"\[CapitalPsi]",
"Omega": r"\[CapitalOmega]"
}
def _monomial_from_str(s, states, coeffs):
"""Transform a rule from a Model into a rule of _NumericalModel"""
dividing = False
coeff = 1.0
states_exponents = np.zeros(len(states) - 1, np.int8)
coeffs_exponents = np.zeros(len(coeffs), np.int8)
# Ensure / is padded
s = " / ".join(s.split("/"))
for token in s.split():
if token == "/":
if dividing:
raise ValueError("Syntax error: Double division in %s" % s)
dividing = True
elif token in states:
i = states.index(token) - 1
if i == -1:
raise ValueError("Syntax error: Nihil state cannot be used as coefficient.")
if dividing:
states_exponents[i] -= 1
dividing = False
else:
states_exponents[i] += 1
elif token in coeffs:
i = coeffs.index(token)
if dividing:
coeffs_exponents[i] -= 1
dividing = False
else:
coeffs_exponents[i] += 1
else:
value = None
try:
value = float(token)
except ValueError:
pass
if value is None:
raise ValueError("Syntax error: Unknown token %s in %s" % (token, s))
else:
coeff *= value
return coeff, states_exponents, coeffs_exponents
def _window(a, window_size=4, step_size=2, copy=False):
"""
Get a sliding window of an array
Args:
a (np.array): The original array.
window_size (int): Window size.
step_size (int): Window step size.
copy (bool): Whether to return a copy of the array instead of a readonly view.
Returns:
np.array: The sliding window.
"""
sh = (a.size - window_size + 1, window_size)
st = a.strides * 2
view = np.lib.stride_tricks.as_strided(a, strides=st, shape=sh, writeable=False)[0::step_size]
if copy:
return view.copy()
else:
return view
def _sliding_time(t, window_size=4, step_size=2, criterion="mean"):
"""Convenience function to extract times to represent a sliding window"""
_t = _window(t, window_size=window_size, step_size=step_size)
if criterion == "first":
return _t[:, 0]
elif criterion == "last":
return _t[:, -1]
elif criterion == "median":
return _t[:, int(window_size / 2)]
elif criterion == "mean":
return np.mean(_t, axis=1)
else:
raise ValueError("Invalid criterion")
def _escape_state(s):
return s.replace("_", "-")
class _Model:
"""A compartment model"""
def __init__(self, states=None, parameters=None, rules=None, sum_state="N", nihil_state="!"):
"""
Args:
states (str or list of str): Names of the states. If str, one letter per state is assumed.
parameters (str or list of str): Names of the coefficients. If str, one letter per state is assumed.
rules (list of (str, str, object) tuples): Transition rules defined by origin state, destination state and
some subclass-dependent object.
sum_state (str): Name of a special state with the total population. Can be used in the coefficients.
nihil_state (str): Name of a special state used to described birth rules (when used as origin) and death
rules (when used as destination).
"""
self.states = list(states) if states is not None else []
self.parameters = list(parameters) if parameters is not None else []
self.rules = list(rules) if rules is not None else []
self.sum_state = sum_state
self.nihil_state = nihil_state
def _get_numerical_model(self, parameters):
"""Return a numerical model which calculates (t, y) -> dy"""
raise NotImplementedError
def get_sympy(self):
"""Try to obtain a symbolic version of the model using sympy"""
assert sympy is not None, "The sympy package is needed for symbolic manipulation"
return sympy.Matrix(
self._get_numerical_model(sympy.symbols(self.parameters))(sympy.symbols('t'), sympy.symbols(self.states)))
def get_fixed_points(self):
"""
Find the fixed points and their jacobians using sympy.
Returns:
list of (tuple of sympy.Expr, sympy.Matrix): Each fixed point as a tuple of sympy expressions (might be a
family of points) and their Jacobians.
"""
assert sympy is not None, "The sympy package is needed for symbolic manipulation"
symbolic = self.get_sympy()
fixed = sympy.solve([sympy.Eq(f, 0) for f in symbolic], *sympy.symbols(self.states))
j = sympy.Matrix(symbolic).jacobian(sympy.symbols(self.states))
jacobians = [j.subs(list(zip(sympy.symbols(self.states), f))) for f in fixed]
return list(zip(fixed, jacobians))
@classmethod
def _coef_to_latex(cls, coeff):
return str(coeff)
def _coef_to_wolfram(self, coeff):
return str(coeff)
@classmethod
def _coef_to_plot(cls, coeff):
return str(coeff)
def to_latex(self):
"""
Get a LaTeX representation of the differential equations of the model.
Returns:
str: LaTeX code suitable for use in an equation environment.
"""
dy = {state: "" for state in self.states}
for origin, destination, coeff in self.rules:
coeff = self._coef_to_latex(coeff)
if origin == self.nihil_state: # special birth rule
dy[destination] += " + %s %s" % (coeff, self._coef_to_latex(self.sum_state))
elif destination == self.nihil_state:
dy[origin] += " - %s %s" % (coeff, self._coef_to_latex(origin))
else:
dy[destination] += " + %s %s" % (coeff, self._coef_to_latex(origin))
dy[origin] += " - %s %s" % (coeff, self._coef_to_latex(origin))
# Replace leading " +" and " -"
for state in self.states:
dy[state] = re.sub(r"^ \+ ", "", dy[state])
dy[state] = re.sub(r"^ - ", "-", dy[state])
array_code = "\\\\\n".join(
"\\dot{%s} &=& %s" % (self._coef_to_latex(state), dy[state]) for state in self.states)
return "\\begin{array}{lcl} %s \n\\end{array}" % array_code
def to_wolfram(self):
"""
Get a Wolfram representation of the differential equations of the model.
Returns:
str: Wolfram code.
"""
dy = {state: "" for state in self.states}
for origin, destination, coeff in self.rules:
coeff = self._coef_to_wolfram(coeff)
if origin == self.nihil_state: # special birth rule
dy[destination] += " + %s %s" % (coeff, self._coef_to_wolfram(self.sum_state))
elif destination == self.nihil_state:
dy[origin] += " - %s %s" % (coeff, self._coef_to_wolfram(origin))
else:
dy[destination] += " + %s %s" % (coeff, self._coef_to_wolfram(origin))
dy[origin] += " - %s %s" % (coeff, self._coef_to_wolfram(origin))
# Replace leading " +" and " -"
for state in self.states:
dy[state] = re.sub(r"^ \+ ", "", dy[state])
dy[state] = re.sub(r"^ - ", "-", dy[state])
system = ",\n".join(
"D[%s, t] == %s" % (self._coef_to_wolfram(state), dy[state]) for state in self.states)
return '<|"system" -> {%s},\n"states" -> {%s},\n"parameters" -> {%s}|>' % (
system, ",".join(self._coef_to_wolfram(s) for s in self.states),
",".join(self._coef_to_wolfram(p) for p in self.parameters))
def solve(self, initial, parameters, t, **kwargs):
"""
Solve the model numerically.
The solution is found using scipy.integrate.solve_ivp, which uses by default an Explicit Runge-Kutta method of order 5(4).
Args:
initial (list of float): Initial population for each of the states.
parameters (list of float): Values for the parameters.
t (list of float): Mesh of time values for which the solution is found.
kwargs: Additional arguments passed to solve_ivp.
Returns:
numpy.ndarray: Values of each component (first coordinate) at the t mesh (second).
"""
return integrate.solve_ivp(self._get_numerical_model(parameters), (t[0], t[-1]), initial, t_eval=t,
**kwargs).y
def solve_time(self, initial, parameters, t):
"""
Solve the model numerically for parameters given as functions of time.
The solution is found using scipy.integrate.odeint, which uses lsoda from the FORTRAN library odepack.
Args:
initial (list of float): Initial population for each of the states.
parameters (list of callable): Functions of time defining the parameters.
t (list of float): Mesh of time values for which the solution is found.
Returns:
"""
raise NotImplementedError
def best_fit(self, data, t, initial_pars, target="dy", component_weights=None, ls_kwargs=None):
"""
Get a best fit of the model to the provided data
The first point in the data is considered to be defining the initial conditions
Args:
data (list of list of float): 2D array whose first index is the component (e.g., S, I, R....) and its
second index is the time.
t (list of float): Time mesh. Must be consistent with data. Explicitly provide None to use an integer mesh
starting from 0.
initial_pars (list of float): Initial values of the parameters.
target (str): Metric to reproduce. Available options are:
- "y": The curves themselves.
- "dy": The changes of the curves.
component_weights (list of float): A set of weights for each component of the model
ls_kwargs (dict): Additional kwargs to pass to the least squares solver. Cf. scipy.optimize.least_squares.
Returns:
scipy.optimize.OptimizeResult: Object describing the result of the fitting. To obtain the parameters access
its x attribute.
"""
if component_weights is None:
component_weights = np.ones((len(data),))
else:
component_weights = np.asarray(component_weights)
assert len(component_weights) == len(data)
if ls_kwargs is None:
ls_kwargs = {}
# Time to first index, component to second
data = np.asarray(data).T
if t is None:
t = np.asarray(list(range(len(data[0]))))
else:
t = np.asarray(t)
if target == "dy":
# Fit to increments
time_increments = np.reshape(t[1:] - t[:-1], (-1, 1))
real_increments = (data[1:] - data[:-1]) / time_increments
def get_residuals(parameters):
m = self._get_numerical_model(parameters)
predicted_increments = [m(time, datum) for time, datum in zip(t, data)][:-1]
return np.reshape(real_increments - predicted_increments, (-1,)) * np.tile(component_weights,
len(data) - 1)
elif target == "y":
# Fit to curves
initials = data[0]
def get_residuals(parameters):
predicted_values = self.solve(initials, parameters, t)
return np.reshape((data.T - predicted_values), (-1,)) * np.repeat(component_weights, len(data))
else:
raise ValueError("Invalid method")
return optimize.least_squares(get_residuals, initial_pars, **ls_kwargs)
def _best_sliding_fit(self, data, t, initial_pars, window_size, step_size, target="dy", component_weights=None,
ls_kwargs=None):
"""Iterator yielding the values for _best_sliding_fit"""
data = np.asarray(data)
t = np.asarray(t)
for data_subset in list(
zip(*[_window(x, window_size, step_size) for x in data], _window(t, window_size, step_size))):
yield self.best_fit(data_subset[:-1], data_subset[-1], initial_pars, target=target,
component_weights=component_weights, ls_kwargs=ls_kwargs).x
def best_sliding_fit(self, data, t, initial_pars, window_size, step_size, target="dy", component_weights=None,
ls_kwargs=None, time_criterion="mean"):
"""
Get a best fit of the model to the provided data
The first point in the data is considered to be defining the initial conditions
Args:
data (list of list of float): 2D array whose first index is the component (e.g., S, I, R....) and its
second index is the time.
t (list of float): Time mesh. Must be consistent with data. Explicitly provide None to use an integer mesh
starting from 0.
initial_pars (list of float): Initial values of the parameters.
window_size (int): Window size (number of time values in each window).
step_size (int): Window step size (number of time values between windows).
target (str): Metric to reproduce. Available options are:
- "y": The curves themselves.
- "dy": The changes of the curves.
component_weights (list of float): A set of weights for each component of the model
ls_kwargs (dict): Additional kwargs to pass to the least squares solver. Cf. scipy.optimize.least_squares.
time_criterion (str): Criterion used to assign a time value for each of the windows. Available values are:
- "last": The last time in the window.
- "first": The first time in the window.
- "median": The median time of the points in the window.
- "mean": The mean time of the points in the window (default).
Returns:
pd.DataFrame: A dataframe with the fits in each of the windows.
"""
if t is None:
t = np.asarray(list(range(len(data[0]))))
values = np.asarray(list(self._best_sliding_fit(data, t, initial_pars, window_size, step_size, target=target,
component_weights=component_weights, ls_kwargs=ls_kwargs)))
t2 = _sliding_time(t, window_size=window_size, step_size=step_size, criterion=time_criterion)
return pd.DataFrame(values, index=t2, columns=self.parameters)
def plot_igraph(self, **kwargs):
"""
Get a plot of a graph representing the model using igraph.
Args:
**kwargs: Additional arguments to pass to igraph.plot. Nodes are ordered as the instance states, with the
special nihil state in the first position, but it is only added if births or deaths are found
in the rules.
Returns:
igraph.Plot: The plot of the graph.
"""
assert igraph is not None, "igraph not available"
g = igraph.Graph(directed=True)
use_nihil = any([self.nihil_state in x[:2] for x in self.rules])
g.add_vertices(([self.nihil_state] if use_nihil else []) + self.states)
g.add_edges([r[:2] for r in self.rules])
g.es["label"] = [self._coef_to_plot(r[2]) for r in self.rules]
if "vertex_label" not in kwargs:
kwargs["vertex_label"] = ([""] if use_nihil else []) + self.states
if "vertex_color" not in kwargs:
kwargs["vertex_color"] = (["red"] if use_nihil else []) + ["blue"] * len(self.states)
return igraph.plot(g, **kwargs)
def plot_tikz(self, filename=None, **kwargs):
"""
Get a plot of a graph representing the model using tikz.
Args:
filename: How to save the generated tikz. Posible patterns follow:
- None: If a jupyter environment is detected, returns a Pillow image if libraries are available.
Otherwise, open a pdf in an external editor.
- "*.tex": Generate a TeX file with the code.
- "*.pdf": Generate a rendered pdf file.
**kwargs: Additional arguments to pass to network2tikz.plot. Cf. https://pypi.org/project/network2tikz/.
Returns:
PIL.Image.Image or None: An image with a preview of the graph if filename is None and running as a notebook.
"""
assert network2tikz is not None, "network2tikz not available"
nodes = [_escape_state(s) for s in self.states]
edges = [(_escape_state(r[0]), _escape_state(r[1])) for r in self.rules]
style = {}
style['node_label'] = ["$%s$" % _latex_normalize(s) for s in self.states]
style['edge_label'] = ["$%s$" % self._coef_to_latex(r[2]) for r in self.rules]
style['node_color'] = ["blue"] * len(self.states)
style['edge_directed'] = True
if any([self.nihil_state in x[:2] for x in self.rules]):
# prepend nihil_state
nodes = [self.nihil_state] + nodes
style['node_color'] = ["red"] + style['node_color']
style['node_label'] = [""] + style['node_label']
style = {**style, **kwargs}
# network2tikz shows a pdf in an external editor if filename is None. Change if running in a notebook
if filename is None and is_notebook():
if pdf2image is None:
warnings.warn("Image will be shown externally since pdf2image is not available")
# Default behaviour below
else:
with tempfile.NamedTemporaryFile(suffix=".pdf") as f:
network2tikz.plot((nodes, edges), f.name, **style)
return pdf2image(f.name)[0]
# In any case
return network2tikz.plot((nodes, edges), filename, **style)
class _NumericalModel:
"""Mathematical form of a compartment."""
def __init__(self, n_states, parameters, rules, agg_states=None):
"""
Args:
n_states (int): Number of states (not including sum or nihil states)
parameters (list of float): Values for the parameters.
rules (list of tuples): Tuples of the form (origin, destination, (coeff, degree_states, degree_parameters)),
those being:
- origin: Index of the origin state.
- destination: Index of the destination state.
- coeff: Real number multiplying the coefficient.
- degree_states: Tuple with the degree of the states in the monomial
(0 being the sum state and -1 the nihil state)
- degree_parameters: Tuple with the degrees of the parameters in the monomial.
"""
self.n_states = n_states
self.agg_states = agg_states if agg_states is not None else []
parameters = np.asarray(parameters)
# Store the fixed value obtained from the numerical coefficient and the parameters
self._rules = [(origin, destination, (coeff * np.prod(parameters ** degree_parameters), degree_states)) for
origin, destination, (coeff, degree_states, degree_parameters) in rules]
def __call__(self, t, y, *args):
dy = np.zeros(self.n_states).tolist() # Cast to python list to avoid casting errors when using sympy
if not self.agg_states:
# [N, Q_1, Q_2, ...]
y = np.concatenate([[np.sum(y)], y])
else:
aggregated = np.sum(self.agg_states * np.asarray(y), axis=1)
# [N, Q_1, Q_2, ..., aggregated_1, aggregated_2]
y = np.concatenate([[np.sum(y)], y, aggregated])
for origin, destination, (coeff, degree_states) in self._rules:
# Note this implementation relies on 0.0**0==1.0
if origin == -1:
dy[destination - 1] += coeff * np.prod(y ** degree_states) * y[0] # Proportional to sum state
elif destination == -1:
dy[origin - 1] -= coeff * np.prod(y ** degree_states) * y[origin]
else:
dy[origin - 1] -= coeff * np.prod(y ** degree_states) * y[origin]
dy[destination - 1] += coeff * np.prod(y ** degree_states) * y[origin]
return np.asarray(dy)
class _NumericalTimeModel:
"""Mathematical form of a compartment model where the parameters are functions of the time"""
def __init__(self, n_states, parameters, rules):
"""
Args:
n_states (int): Number of states (not including sum or nihil states)
parameters (list of callable): The parameters as functions of time.
rules (list of tuples): Tuples of the form (origin, destination, (coeff, degree_states, degree_parameters)),
those being:
- origin: Index of the origin state.
- destination: Index of the destination state.
- coeff: Real number multiplying the coefficient.
- degree_states: Tuple with the degree of the states in the monomial
(0 being the sum state and -1 the nihil state)
- degree_parameters: Tuple with the degrees of the parameters in the monomial.
"""
self._parameters = parameters
self.n_states = n_states
self.rules = rules
def __call__(self, t, y, *args):
parameters = [par(t) for par in self._parameters]
return _NumericalModel(self.n_states, parameters, self.rules)(t, y, *args)
def _latex_normalize(token):
"""Get a normalized LaTeX version of a token"""
if token in _latex_symbols:
return "\\" + token
if "_" in token:
# a_b_c... -> a{b,c...}
prefix, index = token.split("_", 1)
token = "%s_{%s}" % (prefix, index.replace("_", ","))
return token
def _wolfram_normalize(token, time_dependents=None):
"""Get a normalized Wolfram version of a token"""
suffix = "" if time_dependents is None or token not in time_dependents else "[t]"
if "_" in token:
# a_b_c... -> a{b,c...}
prefix, index = token.split("_", 1)
if prefix in _wolfram_symbols:
prefix = _wolfram_symbols[prefix]
else:
prefix = prefix.lower() # Avoid conflict with built-in symbols
token = "Subscript[%s, %s]" % (prefix, index.replace("_", ","))
else:
if token in _wolfram_symbols:
token = _wolfram_symbols[token]
else:
token = token.lower() # Avoid conflict with built-in symbols
return token + suffix
[docs]class Model(_Model):
"""A compartment model with transition rules defined by strings"""
def __init__(self, states=None, parameters=None, rules=None, sum_state="N", nihil_state="!"):
"""
Args:
states (str or list of str): Names of the states. If str, one letter per state is assumed.
parameters (str or list of str): Names of the coefficients. If str, one letter per state is assumed.
rules (list of (str, str, str) tuples): Transition rules defined by origin state, destination state and
multiplicative coefficient. The proportionality on the population of the
origin state is automatically assumed (unless the rule is describing
births¸ where proportionality to the total population is assumed).
The coefficient might be a product of real numbers, states, and
parameters. Division is allowed, parenthesis are not. Use multiple rules to
define additions or subtractions.
sum_state (str): Name of a special state with the total population. Can be used in the coefficients.
A suffixed form of the value provided (e.g., N_1) can also be used in the coefficients,
this will be filled with the sum of states with the same suffix.
nihil_state (str): Name of a special state used to described birth rules (when used as origin) and death
rules (when used as destination).
"""
super().__init__(states, parameters, rules, sum_state, nihil_state)
# A rule-based filtering could be done here
suffixes = {x.split("_")[-1] for x in self.states if "_" in x}
agg_states = {"N_" + suffix: [1 if s.endswith("_" + suffix) else 0 for s in self.states] for suffix in
suffixes}
if agg_states:
self.agg_states, self.agg_rules = zip(*agg_states.items())
self.agg_states = list(self.agg_states)
else:
self.agg_states, self.agg_rules = [], []
all_states = [nihil_state, sum_state] + self.states + self.agg_states
self._rules = [(all_states.index(origin) - 1,
all_states.index(destination) - 1,
_monomial_from_str(s, all_states, self.parameters))
for origin, destination, s in self.rules]
@classmethod
def _coef_to_latex(cls, coeff):
# Ensure / is padded
coeff = " / ".join(coeff.split("/"))
return " ".join(_latex_normalize(token) for token in coeff.split())
def _coef_to_wolfram(self, coeff):
# Ensure / is padded
coeff = " / ".join(coeff.split("/"))
return " ".join(_wolfram_normalize(token, self.states) for token in coeff.split())
def _get_numerical_model(self, parameters):
return _NumericalModel(len(self.states), parameters, self._rules, self.agg_rules)
[docs] def solve_time(self, initial, parameters, t, **kwargs):
"""
Solve the model numerically for parameters given as functions of time.
The solution is found using scipy.integrate.solve_ivp, which uses by default an Explicit Runge-Kutta method of order 5(4).
Args:
initial (list of float): Initial population for each of the states.
parameters (list of callable): Functions of time defining the parameters.
t (list of float): Mesh of time values for which the solution is found.
kwargs: Additional arguments passed to solve_ivp.
Returns:
numpy.ndarray: Values of each component (first coordinate) at the t mesh (second).
"""
return integrate.solve_ivp(_NumericalTimeModel(len(self.states), parameters, self._rules), (t[0], t[-1]),
initial, t_eval=t, **kwargs).y
class _FunctionNumericalModel:
"""Mathematical form of a compartment model with transition rules defined by callables"""
def __init__(self, states, parameters, rules, sum_state="N"):
"""
Args:
states (list of str): Names of the states.
parameters (dict of str to float): Mapping from parameters to their values.
rules (list of tuples): Tuples of the form (origin, destination, f),
those being:
- origin: Index of the origin state.
- destination: Index of the destination state.
- f: Callable receiving values as kwargs and returning a coefficient
sum_state (str): Name of a special state with the total population.
"""
self.states = states
self.n_states = len(states)
self.parameters = parameters
self.rules = rules
self.sum_state = sum_state
def __call__(self, t, y, *args):
dy = np.zeros(self.n_states).tolist()
y = np.concatenate([[np.sum(y)], y])
states = [self.sum_state] + self.states
kwargs = {**dict(zip(states, y)), **self.parameters}
for origin, destination, f in self.rules:
value = f(**kwargs)
if origin == -1:
dy[destination - 1] += value * y[0] # Birth proportionality
elif destination == -1:
dy[origin - 1] -= value * y[origin]
else:
dy[origin - 1] -= value * y[origin]
dy[destination - 1] += value * y[origin]
return np.asarray(dy)
[docs]class FunctionModel(_Model):
"""A compartment model with transition rules defined by callables"""
def __init__(self, states=None, parameters=None, rules=None, sum_state="N", nihil_state="!"):
"""
Args:
states (str or list of str): Names of the states. If str, one letter per state is assumed.
parameters (str or list of str): Names of the coefficients. If str, one letter per state is assumed.
rules (list of (str, str, callable) tuples): Transition rules defined by origin state, destination state and
a callable.
sum_state (str): Name of a special state with the total population. Can be used in the coefficients.
nihil_state (str): Name of a special state used to described birth rules (when used as origin) and death
rules (when used as destination).
"""
super().__init__(states, parameters, rules, sum_state, nihil_state)
all_states = [nihil_state, sum_state] + self.states
self._rules = [(all_states.index(origin) - 1,
all_states.index(destination) - 1,
f)
for origin, destination, f in self.rules]
def _get_numerical_model(self, parameters):
return _FunctionNumericalModel(self.states, dict(zip(self.parameters, parameters)), self._rules,
sum_state=self.sum_state)
@classmethod
def _coef_to_latex(cls, coeff):
return coeff.__name__
@classmethod
def _coef_to_plot(cls, coeff):
return coeff.__name__
[docs]def add_natural(model, birth_param="nu", death_param="mu", birth_state=None):
"""
Add some "natural" birth and death dynamics to a model
Births are described by an absolute rate, while deaths are relative to the population.
Args:
model (Model): Base model which will be extended.
birth_param (str): Name of the birth rate parameter.
death_param (str): Name of the death rate parameter.
birth_state (str): State were births take place. Defaults to the first state of model.
Returns:
Model: A new model with the extended dynamics.
"""
# Birth default to nu /ˈnjuː/ = new
rules = model.rules[:]
states = model.states[:]
parameters = model.parameters[:]
if birth_state is None:
birth_state = states[0]
else:
assert birth_state in states, "Birth state not found in model"
assert birth_param not in parameters, "Birth parameter already exists"
assert death_param not in parameters, "Death parameter already exists"
# Birth
rules.append((model.nihil_state, birth_state, birth_param))
# Death
for state in states:
rules.append((state, model.nihil_state, death_param))
return Model(states, parameters + [birth_param, death_param], rules, model.sum_state, model.nihil_state)