from itertools import product
import numpy as np
import scipy
from scipy.linalg import qr, sqrtm, norm
from tick.base import Base
from tick.hawkes.inference.base import LearnerHawkesNoParam
from tick.hawkes.inference.build.hawkes_inference import (HawkesCumulant as
_HawkesCumulant)
# Tensorflow is not a project requirement but is needed for this class
try:
import tensorflow as tf
except ImportError:
tf = None
pass
[docs]class HawkesCumulantMatching(LearnerHawkesNoParam):
"""This class is used for performing non parametric estimation of
multi-dimensional Hawkes processes based cumulant matching.
It does not make any assumptions on the kernel shape and recovers
the kernel norms only.
This class relies on `Tensorflow`_ to perform the matching of the
cumulants. If `Tensorflow`_ is not installed, it will not work.
Parameters
----------
integration_support : `float`
Controls the maximal lag (positive or negative) upon which we
integrate the cumulant densities (covariance and skewness),
this amounts to neglect border effects. In practice, this is a
good approximation if the support of the kernels is smaller than
integration support and if the spectral norm of the kernel norms
is sufficiently distant from the critical value, namely 1.
It denoted by :math:`H` in the paper.
C : `float`, default=1e3
Level of penalization
penalty : {'l1', 'l2', 'elasticnet', 'none'}, default='none'
The penalization to use. By default no penalization is used.
Penalty is only applied to adjacency matrix.
solver : {'momentum', 'adam', 'adagrad', 'rmsprop', 'adadelta', 'gd'}, default='adam'
Name of tensorflow solver that will be used.
step : `float`, default=1e-2
Initial step size used for learning. Also known as learning rate.
tol : `float`, default=1e-8
The tolerance of the solver (iterations stop when the stopping
criterion is below it). If not reached the solver does ``max_iter``
iterations
max_iter : `int`, default=1000
Maximum number of iterations of the solver
verbose : `bool`, default=False
If `True`, we verbose things, otherwise the solver does not
print anything (but records information in history anyway)
print_every : `int`, default=100
Print history information when ``n_iter`` (iteration number) is
a multiple of ``print_every``
record_every : `int`, default=10
Record history information when ``n_iter`` (iteration number) is
a multiple of ``record_every``
elastic_net_ratio : `float`, default=0.95
Ratio of elastic net mixing parameter with 0 <= ratio <= 1.
* For ratio = 0 this is ridge (L2 squared) regularization.
* For ratio = 1 this is lasso (L1) regularization.
* For 0 < ratio < 1, the regularization is a linear combination
of L1 and L2.
Used in 'elasticnet' penalty
solver_kwargs : `dict`, default=`None`
Extra arguments that will be passed to tensorflow solver
Attributes
----------
n_nodes : `int`
Number of nodes / components in the Hawkes model
baseline : `np.array`, shape=(n_nodes,)
Inferred baseline of each component's intensity
adjacency : `np.ndarray`, shape=(n_nodes, n_nodes)
Inferred adjacency matrix
mean_intensity : list of `np.array` shape=(n_nodes,)
Estimated mean intensities, named :math:`\\widehat{L}` in the paper
covariance : list of `np.array` shape=(n_nodes,n_nodes)
Estimated integrated covariance, named :math:`\\widehat{C}`
in the paper
skewness : list of `np.array` shape=(n_nodes,n_nodes)
Estimated integrated skewness (sliced), named :math:`\\widehat{K^c}`
in the paper
R : `np.array` shape=(n_nodes,n_nodes)
Estimated weight, linked to the integrals of Hawkes kernels.
Use to derive adjacency and baseline
Other Parameters
----------------
cs_ratio : `float`, default=`None`
Covariance-skewness ratio. The higher it is, themore covariance
has an impact the result which leads to more symmetric
adjacency matrices. If None, a default value is computed based
on the norm of the estimated covariance and skewness cumulants.
elastic_net_ratio : `float`, default=0.95
Ratio of elastic net mixing parameter with 0 <= ratio <= 1.
For ratio = 0 this is ridge (L2 squared) regularization
For ratio = 1 this is lasso (L1) regularization
For 0 < ratio < 1, the regularization is a linear combination
of L1 and L2.
Used in 'elasticnet' penalty
References
----------
Achab, M., Bacry, E., Gaïffas, S., Mastromatteo, I., & Muzy, J. F.
(2017, July). Uncovering causality from multivariate Hawkes integrated
cumulants.
`In International Conference on Machine Learning (pp. 1-10)`_.
.. _In International Conference on Machine Learning (pp. 1-10): http://proceedings.mlr.press/v70/achab17a.html
.. _Tensorflow: https://www.tensorflow.org
"""
_attrinfos = {
'_cumulant_computer': {
'writable': False
},
'_solver': {
'writable': False
},
'_elastic_net_ratio': {
'writable': False
},
'_tf_feed_dict': {},
'_tf_graph': {},
'_events_of_cumulants': {
'writable': False
}
}
[docs] def __init__(self, integration_support, C=1e3, penalty='none',
solver='adam', step=1e-2, tol=1e-8, max_iter=1000,
verbose=False, print_every=100, record_every=10,
solver_kwargs=None, cs_ratio=None, elastic_net_ratio=0.95):
try:
import tensorflow
except ImportError:
raise ImportError('`tensorflow` >= 1.4.0 must be available to use '
'HawkesCumulantMatching')
self._tf_graph = tf.Graph()
LearnerHawkesNoParam.__init__(
self, tol=tol, verbose=verbose, max_iter=max_iter,
print_every=print_every, record_every=record_every)
self._elastic_net_ratio = None
self.C = C
self.penalty = penalty
self.elastic_net_ratio = elastic_net_ratio
self.step = step
self.cs_ratio = cs_ratio
self.solver_kwargs = solver_kwargs
if self.solver_kwargs is None:
self.solver_kwargs = {}
self._cumulant_computer = _HawkesCumulantComputer(
integration_support=integration_support)
self._learner = self._cumulant_computer._learner
self._solver = solver
self._tf_feed_dict = None
self._events_of_cumulants = None
self.history.print_order = ["n_iter", "objective", "rel_obj"]
[docs] def compute_cumulants(self, force=False):
"""Compute estimated mean intensity, covariance and sliced skewness
Parameters
----------
force : `bool`
If `True` previously computed cumulants are not reused
"""
self._cumulant_computer.compute_cumulants(verbose=self.verbose,
force=force)
@property
def mean_intensity(self):
if not self._cumulant_computer.cumulants_ready:
self.compute_cumulants()
return self._cumulant_computer.L
@property
def covariance(self):
if not self._cumulant_computer.cumulants_ready:
self.compute_cumulants()
return self._cumulant_computer.C
@property
def skewness(self):
if not self._cumulant_computer.cumulants_ready:
self.compute_cumulants()
return self._cumulant_computer.K_c
[docs] def objective(self, adjacency=None, R=None):
"""Compute objective value for a given adjacency or variable R
Parameters
----------
adjacency : `np.ndarray`, shape=(n_nodes, n_nodes), default=None
Adjacency matrix at which we compute objective.
If `None`, objective will be computed at `R`
R : `np.ndarray`, shape=(n_nodes, n_nodes), default=None
R variable at which objective is computed. Superseded by
adjacency if adjacency is not `None`
Returns
-------
Value of objective function
"""
cost = self._tf_objective_graph()
L, C, K_c = self._tf_placeholders()
if adjacency is not None:
R = scipy.linalg.inv(np.eye(self.n_nodes) - adjacency)
with self._tf_graph.as_default():
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
sess.run(self._tf_model_coeffs.assign(R))
return sess.run(
cost, feed_dict={
L: self.mean_intensity,
C: self.covariance,
K_c: self.skewness
})
@property
def _tf_model_coeffs(self):
"""Tensorflow variable of interest, used to perform minimization of
objective function
"""
with self._tf_graph.as_default():
with tf.variable_scope("model", reuse=tf.AUTO_REUSE):
return tf.get_variable("R", [self.n_nodes, self.n_nodes],
dtype=tf.float64)
@property
def adjacency(self):
return np.eye(self.n_nodes) - scipy.linalg.inv(self.solution)
@property
def baseline(self):
return scipy.linalg.inv(self.solution).dot(self.mean_intensity)
def _tf_placeholders(self):
"""Tensorflow placeholders to manage cumulants data
"""
d = self.n_nodes
if self._tf_feed_dict is None:
with self._tf_graph.as_default():
L = tf.placeholder(tf.float64, d, name='L')
C = tf.placeholder(tf.float64, (d, d), name='C')
K_c = tf.placeholder(tf.float64, (d, d), name='K_c')
self._tf_feed_dict = L, C, K_c
return self._tf_feed_dict
def _tf_objective_graph(self):
"""Objective fonction written as a tensorflow graph
"""
d = self.n_nodes
if self.cs_ratio is None:
cs_ratio = self.approximate_optimal_cs_ratio()
else:
cs_ratio = self.cs_ratio
with self._tf_graph.as_default():
L, C, K_c = self._tf_placeholders()
R = self._tf_model_coeffs
I = tf.constant(np.eye(d), dtype=tf.float64)
# Construct model
variable_covariance = \
tf.matmul(R, tf.matmul(tf.diag(L), R, transpose_b=True))
variable_skewness = \
tf.matmul(C, tf.square(R), transpose_b=True) \
+ 2.0 * tf.matmul(R, R * C, transpose_b=True) \
- 2.0 * tf.matmul(R, tf.matmul(
tf.diag(L), tf.square(R), transpose_b=True))
covariance_divergence = tf.reduce_mean(
tf.squared_difference(variable_covariance, C))
skewness_divergence = tf.reduce_mean(
tf.squared_difference(variable_skewness, K_c))
cost = (1 - cs_ratio) * skewness_divergence
cost += cs_ratio * covariance_divergence
# Add potential regularization
cost = tf.cast(cost, tf.float64)
if self.strength_lasso > 0:
reg_l1 = tf.contrib.layers.l1_regularizer(self.strength_lasso)
cost += reg_l1((I - tf.matrix_inverse(R)))
if self.strength_ridge > 0:
reg_l2 = tf.contrib.layers.l2_regularizer(self.strength_ridge)
cost += reg_l2((I - tf.matrix_inverse(R)))
return cost
[docs] def fit(self, events, end_times=None, adjacency_start=None, R_start=None):
"""Fit the model according to the given training data.
Parameters
----------
events : `list` of `list` of `np.ndarray`
List of Hawkes processes realizations.
Each realization of the Hawkes process is a list of n_node for
each component of the Hawkes. Namely `events[i][j]` contains a
one-dimensional `numpy.array` of the events' timestamps of
component j of realization i.
If only one realization is given, it will be wrapped into a list
end_times : `np.ndarray` or `float`, default = None
List of end time of all hawkes processes that will be given to the
model. If None, it will be set to each realization's latest time.
If only one realization is provided, then a float can be given.
adjacency_start : `str` or `np.ndarray, shape=(n_nodes + n_nodes * n_nodes,), default=`None`
Initial guess for the adjacency matrix. Will be used as
starting point in optimization.
If `None` and `R_start` is also `None`, a default starting point
is estimated from the estimated cumulants
If `"random"`, a starting point is estimated from estimated
cumulants with a bit a randomness
R_start : `np.ndarray`, shape=(n_nodes, n_nodes), default=None
R variable at which we start optimization. Superseded by
adjacency_start if adjacency_start is not `None`
"""
LearnerHawkesNoParam.fit(self, events, end_times=end_times)
self.solve(adjacency_start=adjacency_start, R_start=R_start)
def _solve(self, adjacency_start=None, R_start=None):
"""Launch optimization algorithm
Parameters
----------
adjacency_start : `str` or `np.ndarray, shape=(n_nodes + n_nodes * n_nodes,), default=`None`
Initial guess for the adjacency matrix. Will be used as
starting point in optimization.
If `None`, a default starting point is estimated from the
estimated cumulants
If `"random"`, as with `None`, a starting point is estimated from
estimated cumulants with a bit a randomness
max_iter : `int`
The number of training epochs.
step : `float`
The learning rate used by the optimizer.
solver : {'adam', 'momentum', 'adagrad', 'rmsprop', 'adadelta', 'gd'}, default='adam'
Solver used to minimize the loss. As the loss is not convex, it
cannot be optimized with `tick.optim.solver` solvers
"""
self.compute_cumulants()
if adjacency_start is None and R_start is not None:
start_point = R_start
elif adjacency_start is None or adjacency_start == 'random':
random = adjacency_start == 'random'
start_point = self.starting_point(random=random)
else:
start_point = scipy.linalg.inv(
np.eye(self.n_nodes) - adjacency_start)
cost = self._tf_objective_graph()
L, C, K_c = self._tf_placeholders()
# Launch the graph
with self._tf_graph.as_default():
with tf.variable_scope("solver", reuse=tf.AUTO_REUSE):
tf_solver = self.tf_solver(self.step, **self.solver_kwargs)
optimization = tf_solver.minimize(cost)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
sess.run(self._tf_model_coeffs.assign(start_point))
# Training cycle
for n_iter in range(self.max_iter):
if self._should_record_iter(n_iter):
# We don't use self.objective here as it would be very
# slow
prev_obj = sess.run(
cost, feed_dict={
L: self.mean_intensity,
C: self.covariance,
K_c: self.skewness
})
sess.run(
optimization, feed_dict={
L: self.mean_intensity,
C: self.covariance,
K_c: self.skewness
})
if self._should_record_iter(n_iter):
obj = sess.run(
cost, feed_dict={
L: self.mean_intensity,
C: self.covariance,
K_c: self.skewness
})
rel_obj = abs(obj - prev_obj) / abs(prev_obj)
prev_obj = obj
converged = rel_obj < self.tol
force = converged or n_iter + 1 == self.max_iter
self._handle_history(n_iter + 1, objective=obj,
rel_obj=rel_obj, force=force)
if converged:
break
self._set('solution', sess.run(self._tf_model_coeffs))
[docs] def approximate_optimal_cs_ratio(self):
"""Heuristic to set covariance skewness ratio close to its
optimal value
"""
norm_sq_C = norm(self.covariance) ** 2
norm_sq_K_c = norm(self.skewness) ** 2
return norm_sq_K_c / (norm_sq_K_c + norm_sq_C)
[docs] def starting_point(self, random=False):
"""Heuristic to find a starting point candidate
Parameters
----------
random : `bool`
Use a random orthogonal matrix instead of identity
Returns
-------
startint_point : `np.ndarray`, shape=(n_nodes, n_nodes)
A starting point candidate
"""
sqrt_C = sqrtm(self.covariance)
sqrt_L = np.sqrt(self.mean_intensity)
if random:
random_matrix = np.random.rand(self.n_nodes, self.n_nodes)
M, _ = qr(random_matrix)
else:
M = np.eye(self.n_nodes)
initial = np.dot(np.dot(sqrt_C, M), np.diag(1. / sqrt_L))
return initial
def get_kernel_values(self, i, j, abscissa_array):
raise ValueError('Hawkes cumulant cannot estimate kernel values')
[docs] def get_kernel_norms(self):
"""Computes kernel norms. This makes our learner compliant with
`tick.plot.plot_hawkes_kernel_norms` API
Returns
-------
norms : `np.ndarray`, shape=(n_nodes, n_nodes)
2d array in which each entry i, j corresponds to the norm of
kernel i, j
"""
return self.adjacency
@property
def solver(self):
return self._solver
@solver.setter
def solver(self, val):
available_solvers = [
'momentum', 'adam', 'adagrad', 'rmsprop', 'adadelta', 'gd'
]
if val.lower() not in available_solvers:
raise ValueError('solver must be one of {}, recieved {}'.format(
available_solvers, val))
self._set('_solver', val)
@property
def tf_solver(self):
if self.solver.lower() == 'momentum':
return tf.train.MomentumOptimizer
elif self.solver.lower() == 'adam':
return tf.train.AdamOptimizer
elif self.solver.lower() == 'adagrad':
return tf.train.AdagradOptimizer
elif self.solver.lower() == 'rmsprop':
return tf.train.RMSPropOptimizer
elif self.solver.lower() == 'adadelta':
return tf.train.AdadeltaOptimizer
elif self.solver.lower() == 'gd':
return tf.train.GradientDescentOptimizer
@property
def elastic_net_ratio(self):
return self._elastic_net_ratio
@elastic_net_ratio.setter
def elastic_net_ratio(self, val):
if val < 0 or val > 1:
raise ValueError("`elastic_net_ratio` must be between 0 and 1, "
"got %s" % str(val))
else:
self._set("_elastic_net_ratio", val)
@property
def strength_lasso(self):
if self.penalty == 'elasticnet':
return self.elastic_net_ratio / self.C
elif self.penalty == 'l1':
return 1. / self.C
else:
return 0.
@property
def strength_ridge(self):
if self.penalty == 'elasticnet':
return (1 - self.elastic_net_ratio) / self.C
elif self.penalty == 'l2':
return 1. / self.C
return 0.
class _HawkesCumulantComputer(Base):
"""Private class to compute Hawkes cumulants
"""
_cpp_obj_name = '_learner'
_attrinfos = {
'integration_support': {
'cpp_setter': 'set_integration_support'
},
'_learner': {},
'L': {},
'C': {},
'K_c': {},
'_L_day': {},
'_J': {},
'_events_of_cumulants': {},
}
def __init__(self, integration_support=100.):
Base.__init__(self)
self.integration_support = integration_support
self._learner = _HawkesCumulant(self.integration_support)
self.L = None
self.C = None
self.K_c = None
self._L_day = None
self._J = None
self._events_of_cumulants = None
def compute_cumulants(self, verbose=False, force=False):
"""Compute estimated mean intensity, covariance and sliced skewness
Parameters
----------
verbose : `bool`
If `True`, a message will be printed when previously computed
cumulants are reused
force : `bool`
If `True` cumulants will always be recomputed
"""
if len(self.realizations) == 0:
raise RuntimeError('Cannot compute cumulants if no realization '
'has been provided')
# Check if cumulants have already been computed
if self.cumulants_ready and not force:
if verbose:
print('Use previouly computed cumulants')
return
# Remember for which realizations cumulants have been computed
self._events_of_cumulants = self.realizations
self.compute_L()
self.compute_C_and_J()
self.K_c = self.compute_E_c()
self._learner.set_are_cumulants_ready(True)
@staticmethod
def _same_realizations(events_1, events_2):
if len(events_1) != len(events_2):
return False
for r in range(len(events_1)):
if len(events_1[r]) != len(events_2[r]):
return False
# Fast check that both arrays share the same pointer
for i in range(len(events_1[r])):
if events_1[r][i].__array_interface__ != \
events_2[r][i].__array_interface__:
return False
return True
def compute_L(self):
self._L_day = np.zeros((self.n_realizations, self.n_nodes))
for day, realization in enumerate(self.realizations):
for i in range(self.n_nodes):
process = realization[i]
self._L_day[day][i] = len(process) / self.end_times[day]
self.L = np.mean(self._L_day, axis=0)
def compute_C_and_J(self):
self.C = np.zeros((self.n_nodes, self.n_nodes))
self._J = np.zeros((self.n_realizations, self.n_nodes, self.n_nodes))
d = self.n_nodes
for day in range(len(self.realizations)):
C = np.zeros((d, d))
J = np.zeros((d, d))
for i, j in product(range(d), repeat=2):
res = self._learner.compute_A_and_I_ij(day, i, j,
self._L_day[day][j])
C[i, j] = res[0]
J[i, j] = res[1]
# we keep the symmetric part to remove edge effects
C[:] = 0.5 * (C + C.T)
J[:] = 0.5 * (J + J.T)
self.C += C / self.n_realizations
self._J[day] = J.copy()
def compute_E_c(self):
E_c = np.zeros((self.n_nodes, self.n_nodes, 2))
d = self.n_nodes
for day in range(len(self.realizations)):
for i in range(d):
for j in range(d):
E_c[i, j, 0] += self._learner.compute_E_ijk(
day, i, j, j, self._L_day[day][i], self._L_day[day][j],
self._J[day][i, j])
E_c[i, j, 1] += self._learner.compute_E_ijk(
day, j, j, i, self._L_day[day][j], self._L_day[day][j],
self._J[day][j, j])
E_c /= self.n_realizations
K_c = np.zeros((self.n_nodes, self.n_nodes))
K_c += 2 * E_c[:, :, 0]
K_c += E_c[:, :, 1]
K_c /= 3.
return K_c
@property
def n_nodes(self):
return self._learner.get_n_nodes()
@property
def n_realizations(self):
return len(self._learner.get_n_jumps_per_realization())
@property
def realizations(self):
return self._learner.get_timestamps_list()
@property
def end_times(self):
return self._learner.get_end_times()
@property
def cumulants_ready(self):
events_didnt_change = self._events_of_cumulants is not None and \
_HawkesCumulantComputer._same_realizations(
self._events_of_cumulants, self.realizations)
return self._learner.get_are_cumulants_ready() and events_didnt_change