Source code for tick.hawkes.simulation.simu_hawkes

# License: BSD 3 clause

import warnings
from itertools import product

import numpy as np
from numpy.linalg import eig, inv

from tick.base import TimeFunction
from tick.hawkes.simulation.base import SimuPointProcess
from tick.hawkes.simulation.build.hawkes_simulation import Hawkes as _Hawkes
from .hawkes_kernels import HawkesKernel0


[docs]class SimuHawkes(SimuPointProcess): """Hawkes process simulation They are defined by the intensity: .. math:: \\forall i \\in [1 \\dots D], \\quad \\lambda_i(t) = \\mu_i(t) + \\sum_{j=1}^D \\int \\phi_{ij}(t - s) dN_j(s) where * :math:`D` is the number of nodes * :math:`\mu_i(t)` are the baseline intensities * :math:`\phi_{ij}` are the kernels * :math:`dN_j` are the processes differentiates Parameters ---------- kernels : `np.ndarray`, shape=(n_nodes, n_nodes) A 2-dimensional arrays of kernels, also noted :math:`\phi_{ij}` baseline : `np.ndarray` or `list` The baseline of all intensities, also noted :math:`\mu(t)`. It might be three different types: * `np.ndarray`, shape=(n_nodes,) : One baseline per node is given. Hence baseline is assumed to be constant, ie. :math:`\mu_i(t) = \mu_i` * `np.ndarray`, shape=(n_nodes, n_intervals) : `n_intervals` baselines are given per node. This assumes parameter `period_length` is also given. In this case baseline is piecewise constant on intervals of size `period_length / n_intervals` and periodic. * `list` of `tick.base.TimeFunction`, shape=(n_nodes,) : One function is given per node, ie. :math:`\mu_i(t)` is explicitely given. n_nodes : `int` The number of nodes of the Hawkes process. If kernels and baseline are None this will create a Hawkes process with n_nodes nodes of zero kernels and with zero baseline end_time : `float`, default=None Time until which this point process will be simulated period_length : `float`, default=None Period of baseline in piecewise constant case. max_jumps : `int`, default=None Simulation will stop if this number of jumps in reached seed : `int`, default = None The seed of the random sampling. If it is None then a random seed (different at each run) will be chosen. force_simulation : `bool`, default = False If force is not set to True, simulation won't be run if the matrix of the L1 norm of kernels has a spectral radius greater or equal to 1 as it would be unstable Attributes ---------- timestamps : `list` of `np.ndarray`, size=n_nodes A list of n_nodes timestamps arrays, each array containing the timestamps of all the jumps for this node simulation_time : `float` Time until which this point process has been simulated n_total_jumps : `int` Total number of jumps simulated tracked_intensity : `list[np.ndarray]`, size=n_nodes A record of the intensity with which this point process has been simulated. Note: you must call track_intensity before simulation to record it intensity_tracked_times : `np.ndarray` The times at which intensity has been recorded. Note: you must call track_intensity before simulation to record it intensity_track_step : `float` Step with which the intensity has been recorded """ _attrinfos = { "kernels": { "writable": False }, "_kernel_0": { "writable": False }, }
[docs] def __init__(self, kernels=None, baseline=None, n_nodes=None, end_time=None, period_length=None, max_jumps=None, seed=None, verbose=True, force_simulation=False): SimuPointProcess.__init__(self, end_time=end_time, max_jumps=max_jumps, seed=seed, verbose=verbose) self.force_simulation = force_simulation # We keep a reference on this kernel to avoid copies self._kernel_0 = HawkesKernel0() self.period_length = period_length if isinstance(kernels, list): kernels = np.array(kernels) if isinstance(baseline, list): baseline = np.asarray(baseline) if baseline is not None: if baseline.dtype not in [float, object]: baseline = baseline.astype(float) self.check_parameters_coherence(kernels, baseline, n_nodes) # Init _pp so we have access to self.n_nodes if n_nodes is None: if baseline is not None: n_nodes = baseline.shape[0] else: n_nodes = kernels.shape[0] if n_nodes <= 0: raise ValueError( "n_nodes must be positive but equals %i" % n_nodes) self._pp = _Hawkes(n_nodes, self._pp_init_seed) if kernels is not None: if kernels.shape != (self.n_nodes, self.n_nodes): raise ValueError("kernels shape should be %s instead of %s" % ( (self.n_nodes, self.n_nodes), kernels.shape)) self.kernels = kernels self._init_kernels() else: self._init_zero_kernels() if baseline is not None: if baseline.shape[0] != self.n_nodes: raise ValueError("baseline length should be {} instead of {}" .format(self.n_nodes, baseline.shape[0])) if len(baseline.shape) > 2: raise ValueError("baseline should have at most {} dimensions, " "it currently has".format( 2, len(baseline.shape))) self.baseline = baseline self._init_baseline() else: self._init_zero_baseline()
def _init_zero_kernels(self): self.kernels = np.empty((self.n_nodes, self.n_nodes), dtype=object) for i, j in product(range(self.n_nodes), range(self.n_nodes)): self.kernels[i, j] = self._kernel_0 def _init_zero_baseline(self): self.baseline = np.empty((self.n_nodes,), dtype=object) for i in range(self.n_nodes): self.set_baseline(i, 0) def _init_baseline(self): for i in range(self.n_nodes): self.set_baseline(i, self.baseline[i]) def _init_kernels(self): for i, j in product(range(self.n_nodes), range(self.n_nodes)): if self.kernels[i, j] == 0: kernel_ij = self._kernel_0 else: kernel_ij = self.kernels[i, j] self.set_kernel(i, j, kernel_ij) def check_parameters_coherence(self, kernels, baseline, n_nodes): set_kernels = kernels is not None set_baseline = baseline is not None set_n_nodes = n_nodes is not None if set_n_nodes and (set_kernels or set_baseline): raise ValueError("n_nodes will be automatically calculated if " "baseline or kernels is set") if not set_n_nodes and not set_kernels and not set_baseline: raise ValueError("n_nodes must be given if neither kernels, " "nor baseline are given") if set_kernels and set_baseline and len(kernels) != len(baseline): raise ValueError("kernels and baseline have different length. " "kernels has length %i, whereas baseline has " "length %i." % (len(kernels), len(baseline))) def set_kernel(self, i, j, kernel): if isinstance(kernel, (int, float)) and kernel == 0: self.kernels[i, j] = self._kernel_0 self._pp.set_kernel(i, j, self._kernel_0._kernel) else: self.kernels[i, j] = kernel self._pp.set_kernel(i, j, kernel._kernel) def set_baseline(self, i, baseline): self.baseline[i] = baseline error_msg = 'Baseline element must be either a float or an ' \ 'array or a TimeFunction' if isinstance(baseline, (float, int)): self._pp.set_baseline(i, baseline) elif isinstance(baseline, np.ndarray): if len(baseline.shape) > 1: ValueError('Baseline element might have at most 1 dimension') n_intervals = len(baseline) # We need to append one value as it will be used through # tick.base.TimeFunction with InterConstRight with does not take # into account the last value (except for t = period_length) t_values = np.linspace(0, self.period_length, n_intervals + 1) extended_baseline = np.hstack((baseline, baseline[-1])) self._pp.set_baseline(i, t_values, extended_baseline) elif isinstance(baseline, TimeFunction): self._pp.set_baseline(i, baseline._time_function) else: raise ValueError(error_msg)
[docs] def get_baseline_values(self, i, t_values): """Outputs value of baseline depending on time Parameters ---------- i : `int` Selected dimension t_values : `np.ndarray` Values baseline will be computed at Returns ------- output : `np.ndarray` Value of baseline `i` at `t_values` """ return self._pp.get_baseline(i, t_values)
def _simulate(self): """Launch simulation of the Hawkes process by thinning """ if self.baseline.dtype == float and np.linalg.norm(self.baseline) == 0: warnings.warn("Baselines have not been set, hence this hawkes " "process won't jump") if not self.force_simulation and self.spectral_radius() >= 1 \ and self.max_jumps is None: raise ValueError( "Simulation not launched as this Hawkes process " "is not stable (spectral radius of %.2g). You " "can use force_simulation parameter if you " "really want to simulate it" % self.spectral_radius()) SimuPointProcess._simulate(self)
[docs] def spectral_radius(self): """Compute the spectral radius of the matrix of l1 norm of Hawkes kernels. Notes ----- If the spectral radius is greater that 1, the hawkes process is not stable """ get_norm = np.vectorize(lambda kernel: kernel.get_norm()) norms = get_norm(self.kernels) # It might happens that eig returns a complex number but with a # negligible complex part, in this case we keep only the real part spectral_radius = max(eig(norms)[0]) spectral_radius = np.real_if_close(spectral_radius) return spectral_radius
[docs] def mean_intensity(self): """Compute the mean intensity vector """ get_norm = np.vectorize(lambda kernel: kernel.get_norm()) norms = get_norm(self.kernels) return inv(np.eye(self.n_nodes) - norms).dot(self.baseline)