Source code for tick.survival.convolutional_sccs

import inspect
import numpy as np
from abc import ABC
from operator import itemgetter
from collections import namedtuple
from sklearn.model_selection import StratifiedKFold
from tick.base import Base
from tick.prox import ProxZero, ProxMulti, ProxTV, ProxEquality, \
    ProxGroupL1
from tick.solver import SVRG
from tick.survival import SimuSCCS, ModelSCCS
from tick.preprocessing import LongitudinalSamplesFilter, \
    LongitudinalFeaturesLagger
from scipy.stats import uniform
import matplotlib.pylab as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
from tick.preprocessing.utils import check_longitudinal_features_consistency, \
    check_censoring_consistency, safe_array

# Case classes
Confidence_intervals = namedtuple(
    'Confidence_intervals',
    ['refit_coeffs', 'lower_bound', 'upper_bound', 'confidence_level'])


# TODO later: exploit new options of SVRG (parallel fit, variance_reduction...)
# TODO later: add SAGA solver
[docs]class ConvSCCS(ABC, Base): """ConvSCCS learner, estimates lagged features effect using TV and Group L1 penalties. These penalties constrain the coefficient groups modelling the lagged effects to ensure their regularity and sparsity. Parameters ---------- n_lags : `numpy.ndarray`, shape=(n_features,), dtype="uint64" Number of lags per feature. The model will regress labels on the last observed values of the features over their corresponding `n_lags` time intervals. `n_lags` values must be between 0 and `n_intervals` - 1. penalized_features : `numpy.ndarray`, shape=(n_features,), dtype="bool", default=None Booleans indicating whether the features should be penalised or not. If set to None, pernalize all features. C_tv : `float`, default=None Level of TV penalization TV penalization. This value should be `None` or greater than 0. C_group_l1 : `float`, default=None Level of group Lasso penalization. This value should be `None` or greater than 0. step : `float`, default=None Step-size parameter, the most important parameter of the solver. If set to None, it will be automatically tuned as ``step = 1 / model.get_lip_max()``. tol : `float`, default=1e-5 The tolerance of the solver (iterations stop when the stopping criterion is below it). max_iter : `int`, default=100 Maximum number of iterations of the solver, namely maximum number of epochs. verbose : `bool`, default=False If `True`, solver verboses history, otherwise nothing is displayed. print_every : `int`, default=1 Print history information every time the iteration number is a multiple of ``print_every``. Used only is ``verbose`` is True. record_every : `int`, default=1 Save history information every time the iteration number is a multiple of ``record_every``. random_state : `int`, default=None If not None, the seed of the random sampling. Attributes ---------- n_cases : `int` (read-only) Number of samples with at least one outcome. n_intervals : `int` (read-only) Number of time intervals. n_features : `int` (read-only) Number of features. n_coeffs : `int` (read-only) Total number of coefficients of the model. coeffs : `list` (read-only) List containing 1-dimensional `np.ndarray` (`dtype=float`) containing the coefficients of the model. Each numpy array contains the `(n_lags + 1)` coefficients associated with a feature. Each coefficient of such arrays can be interpreted as the log relative intensity associated with this feature, `k` periods after exposure start, where `k` is the index of the coefficient in the array. intensities : `list` (read-only) List containing 1-dimensional `np.ndarray` (`dtype=float`) containing the intensities estimated by the model. Each numpy array contains the relative intensities of a feature. Element of these arrays can be interpreted as the relative intensity associated with a feature, `k` periods after exposure start, where `k` is the index of the coefficient in the array. confidence_intervals : `Confidence_intervals` (read-only) Coefficients refitted on the model and associated confidence intervals computed using parametric bootstrap. Refitted coefficients are projected on the support of the coefficients estimated by the penalised model. Refitted coefficients and their confidence intervals follow the same structure as `coeffs`. References ---------- Morel, M., Bacry, E., Gaïffas, S., Guilloux, A., & Leroy, F. (Submitted, 2018, January). ConvSCCS: convolutional self-controlled case series model for lagged adverse event detection """ _const_attr = [ # constructed attributes '_preprocessor_obj', '_model_obj', '_solver_obj', # user defined parameters '_n_lags', '_penalized_features', '_C_tv', '_C_group_l1', '_random_state', # computed attributes 'n_cases', 'n_intervals', 'n_features', 'n_coeffs', '_coeffs', '_features_offset', '_fitted', '_step_size', # refit _coeffs, median, and CI data 'confidence_intervals', '_solver_info' ] _attrinfos = {key: {'writable': False} for key in _const_attr} def __init__(self, n_lags: np.array, penalized_features: np.array = None, C_tv=None, C_group_l1=None, step: float = None, tol: float = 1e-5, max_iter: int = 100, verbose: bool = False, print_every: int = 10, record_every: int = 10, random_state: int = None): Base.__init__(self) # Init objects to be computed later self.n_cases = None self.n_intervals = None self.n_features = None self.n_coeffs = None self._coeffs = None self.confidence_intervals = Confidence_intervals( list(), list(), list(), None) self._fitted = None self._step_size = None # Init user defined parameters self._features_offset = None self._n_lags = None self.n_lags = n_lags self._penalized_features = None self.penalized_features = penalized_features self._C_tv = None self._C_group_l1 = None self.C_tv = C_tv self.C_group_l1 = C_group_l1 self._step_size = step self.tol = tol self.max_iter = max_iter self.verbose = verbose, self.print_every = print_every self.record_every = record_every random_state = int( np.random.randint(0, 1000, 1)[0] if random_state is None else random_state) self._random_state = None self.random_state = random_state # Construct objects self._preprocessor_obj = self._construct_preprocessor_obj() self._model_obj = None self._solver_info = ( step, max_iter, tol, print_every, record_every, verbose, self.random_state) self._solver_obj = self._construct_solver_obj(*self._solver_info) # Interface def fit(self, features: list, labels: list, censoring: np.array, confidence_intervals: bool = False, n_samples_bootstrap: int = 200, confidence_level: float = .95): """Fit the model according to the given training data. Parameters ---------- features : `list` of `numpy.ndarray` or `list` of `scipy.sparse.csr_matrix`, list of length n_cases, each element of the list of shape=(n_intervals, n_features) The list of features matrices. labels : `list` of `numpy.ndarray`, list of length n_cases, each element of the list of shape=(n_intervals,) The labels vector censoring : `numpy.ndarray`, shape=(n_cases,), dtype="uint64" The censoring data. This array should contain integers in [1, n_intervals]. If the value i is equal to n_intervals, then there is no censoring for sample i. If censoring = c < n_intervals, then the observation of sample i is stopped at interval c, that is, the row c - 1 of the corresponding matrix. The last n_intervals - c rows are then set to 0. confidence_intervals : `bool`, default=False Activate parametric bootstrap confidence intervals computation. n_samples_bootstrap : `int`, default=200 Number of parametric bootstrap iterations confidence_level : `float`, default=.95 Confidence level of the bootstrapped confidence intervals Returns ------- output : `LearnerSCCS` The current instance with given data """ p_features, p_labels, p_censoring = self._prefit( features, labels, censoring) self._fit(project=False) self._postfit(p_features, p_labels, p_censoring, False, confidence_intervals, n_samples_bootstrap, confidence_level) return self.coeffs, self.confidence_intervals def score(self, features=None, labels=None, censoring=None): """Returns the negative log-likelihood of the model, using the current fitted coefficients on the passed data. If no data is passed, the negative log-likelihood is computed using the data used for training. Parameters ---------- features : `None` or `list` of `numpy.ndarray` or `list` of `scipy.sparse.csr_matrix`, list of length n_cases, each element of the list of shape=(n_intervals, n_features) The list of features matrices. labels : `None` or `list` of `numpy.ndarray`, list of length n_cases, each element of the list of shape=(n_intervals,) The labels vector censoring : `None` or `numpy.ndarray`, shape=(n_cases,), dtype="uint64" The censoring data. This array should contain integers in [1, n_intervals]. If the value i is equal to n_intervals, then there is no censoring for sample i. If censoring = c < n_intervals, then the observation of sample i is stopped at interval c, that is, the row c - 1 of the corresponding matrix. The last n_intervals - c rows are then set to 0. Returns ------- output : `float` The value of the negative log-likelihood """ return self._score(features, labels, censoring, preprocess=True) def fit_kfold_cv(self, features, labels, censoring, C_tv_range: tuple = (), C_group_l1_range: tuple = (), logscale=True, n_cv_iter: int = 30, n_folds: int = 3, shuffle: bool = True, confidence_intervals: bool = False, n_samples_bootstrap: int = 100, confidence_level: float = .95): """Perform a cross validation to find optimal hyperparameters given training data. Cross validation using stratified K-folds and random search, parameters being sampled uniformly in the range (logscale or linspace) specified by the user. Parameters ---------- features : `list` of `numpy.ndarray` or `list` of `scipy.sparse.csr_matrix` list of length n_cases, each element of the list of shape=(n_intervals, n_features) The list of features matrices. labels : `list` of `numpy.ndarray`, list of length n_cases, each element of the list of shape=(n_intervals,) The labels vector censoring : `numpy.ndarray`, shape=(n_cases,), dtype="uint64" The censoring data. This array should contain integers in [1, n_intervals]. If the value i is equal to n_intervals, then there is no censoring for sample i. If censoring = c < n_intervals, then the observation of sample i is stopped at interval c, that is, the row c - 1 of the corresponding matrix. The last n_intervals - c rows are then set to 0. C_tv_range : `tuple`, shape=(2,), dtype="int" Range in which sampling TV penalization level during the random search. `logscale=True`, range values are understood as powers of ten, i.e `(0, 5)` will result in samples being in `10**(0), 10**(5)`. C_group_l1_range : `tuple`, shape=(2,), dtype="int" Range in which sampling group L1 penalization level during the random search. `logscale=True`, range values are understood as powers of ten, i.e `(0, 5)` will result in samples being in `10**(0), 10**(5)`. logscale : `bool` If `True`, hyperparameters are sampled on logscale. n_cv_iter : `int` Number of hyperparameters samples to draw when performing random search. n_folds : `int` Number of folds used to perform the cross validation. shuffle : `bool` If `True`, the data is shuffled before performing train / validation / test splits. confidence_intervals : `bool`, default=False Activate parametric bootstrap confidence intervals computation. n_samples_bootstrap : `int`, default=200 Number of parametric bootstrap iterations confidence_level : `float`, default=.95 Confidence level of the bootstrapped confidence intervals Returns ------- output : `LearnerSCCS` The current instance with given data """ # setup the model and preprocess the data p_features, p_labels, p_censoring = self._prefit( features, labels, censoring) # split the data with stratified KFold kf = StratifiedKFold(n_folds, shuffle, self.random_state) labels_interval = np.nonzero(p_labels)[1] # Training loop model_global_parameters = { "n_intervals": self.n_intervals, "n_lags": self.n_lags, "n_features": self.n_features, } cv_tracker = CrossValidationTracker(model_global_parameters) C_tv_generator, C_group_l1_generator = self._construct_generator_obj( C_tv_range, C_group_l1_range, logscale) # TODO later: parallelize CV i = 0 while i < n_cv_iter: self._set('_coeffs', np.zeros(self.n_coeffs)) self.C_tv = C_tv_generator.rvs(1)[0] self.C_group_l1 = C_group_l1_generator.rvs(1)[0] train_scores = [] test_scores = [] for train_index, test_index in kf.split(p_features, labels_interval): train = itemgetter(*train_index.tolist()) test = itemgetter(*test_index.tolist()) X_train, X_test = list(train(p_features)), list( test(p_features)) y_train, y_test = list(train(p_labels)), list(test(p_labels)) censoring_train, censoring_test = p_censoring[train_index], \ p_censoring[test_index] self._model_obj.fit(X_train, y_train, censoring_train) self._fit(project=False) train_scores.append(self._score()) test_scores.append(self._score(X_test, y_test, censoring_test)) cv_tracker.log_cv_iteration(self.C_tv, self.C_group_l1, np.array(train_scores), np.array(test_scores)) i += 1 # refit best model on all the data best_parameters = cv_tracker.find_best_params() self.C_tv = best_parameters["C_tv"] self.C_group_l1 = best_parameters["C_group_l1"] self._set('_coeffs', np.zeros(self.n_coeffs)) self._model_obj.fit(p_features, p_labels, p_censoring) coeffs, bootstrap_ci = self._postfit( p_features, p_labels, p_censoring, True, confidence_intervals, n_samples_bootstrap, confidence_level) cv_tracker.log_best_model(self.C_tv, self.C_group_l1, self._coeffs.tolist(), self.score(), self.confidence_intervals) return self.coeffs, cv_tracker def plot_intensities(self, figsize=(10, 6), sharex=False, sharey=False): """Plot intensities estimated by the penalized model. The intensities subfigures are plotted on two columns. Parameters ---------- figsize : `tuple`, default=(10, 6) Size of the figure sharex : `bool`, default=False Constrain the x axes to have the same range. sharey : `bool`, default=False Constrain the y axes to have the same range. Returns ------- fig : `matplotlib.figure.Figure` Figure to be plotted axarr : `numpy.ndarray`, `dtype=object` `matplotlib.axes._subplots.AxesSubplot` objects associated to each intensity subplot. """ n_rows = int(np.ceil(self.n_features / 2)) remove_last_plot = self.n_features % 2 != 0 fig, axarr = plt.subplots(n_rows, 2, sharex=sharex, sharey=sharey, figsize=figsize) for i, c in enumerate(self.coeffs): self._plot_intensity(axarr[i // 2][i % 2], c, None, None) plt.suptitle('Estimated (penalized) relative risks') axarr[0][1].legend(loc='upper right') [ax[0].set_ylabel('Relative incidence') for ax in axarr] [ax.set_xlabel('Time after exposure start') for ax in axarr[-1]] if remove_last_plot: fig.delaxes(axarr[-1][-1]) return fig, axarr def plot_confidence_intervals(self, figsize=(10, 6), sharex=False, sharey=False): """Plot intensities estimated by the penalized model. The intensities subfigures are plotted on two columns. Parameters ---------- figsize : `tuple`, default=(10, 6) Size of the figure sharex : `bool`, default=False Constrain the x axes to have the same range. sharey : `bool`, default=False Constrain the y axes to have the same range. Returns ------- fig : `matplotlib.figure.Figure` Figure to be plotted axarr : `numpy.ndarray`, `dtype=object` `matplotlib.axes._subplots.AxesSubplot` objects associated to each intensity subplot """ n_rows = int(np.ceil(self.n_features / 2)) remove_last_plot = (self.n_features % 2 != 0) fig, axarr = plt.subplots(n_rows, 2, sharex=sharex, sharey=sharey, figsize=figsize) ci = self.confidence_intervals coeffs = ci['refit_coeffs'] lb = ci['lower_bound'] ub = ci['upper_bound'] for i, c in enumerate(coeffs): self._plot_intensity(axarr[i // 2][i % 2], c, lb[i], ub[i]) plt.suptitle('Estimated relative risks with 95% confidence bands') axarr[0][1].legend(loc='best') [ax[0].set_ylabel('Relative incidence') for ax in axarr] [ax.set_xlabel('Time after exposure start') for ax in axarr[-1]] if remove_last_plot: fig.delaxes(axarr[-1][-1]) return fig, axarr @staticmethod def _plot_intensity(ax, coeffs, upper_bound, lower_bound): n_coeffs = len(coeffs) if n_coeffs > 1: x = np.arange(n_coeffs) ax.step(x, np.exp(coeffs), label="Estimated RI") if upper_bound is not None and lower_bound is not None: ax.fill_between(x, np.exp(lower_bound), np.exp(upper_bound), alpha=.5, color='orange', step='pre', label="95% boostrap CI") elif n_coeffs == 1: if upper_bound is not None and lower_bound is not None: ax.errorbar(0, coeffs, yerr=(np.exp(lower_bound), np.exp(upper_bound)), fmt='o', ecolor='orange') else: ax.scatter([0], np.exp(coeffs), label="Estimated RI") return ax # Internals def _prefit(self, features, labels, censoring): n_intervals, n_features = features[0].shape if any(self.n_lags > n_intervals - 1): raise ValueError('`n_lags` should be < `n_intervals` - 1, where ' '`n_intervals` is the number of rows of the ' 'feature matrices.') n_cases = len(features) censoring = check_censoring_consistency(censoring, n_cases) features = check_longitudinal_features_consistency( features, (n_intervals, n_features), "float64") labels = check_longitudinal_features_consistency( labels, (n_intervals,), "int32") self._set('n_features', n_features) self._set('n_intervals', n_intervals) features, labels, censoring = self._preprocess_data( features, labels, censoring) n_coeffs = int(np.sum(self._n_lags) + self.n_features) self._set('n_coeffs', n_coeffs) self._set('_coeffs', np.zeros(n_coeffs)) self._set('n_cases', len(features)) # Step computation self._set("_model_obj", self._construct_model_obj()) self._model_obj.fit(features, labels, censoring) if self.step is None: self.step = 1 / self._model_obj.get_lip_max() return features, labels, censoring def _fit(self, project): prox_obj = self._construct_prox_obj(self._coeffs, project) solver_obj = self._solver_obj model_obj = self._model_obj # Now, we can pass the model and prox objects to the solver solver_obj.set_model(model_obj).set_prox(prox_obj) coeffs_start = self._coeffs # Launch the solver _coeffs = solver_obj.solve(coeffs_start, step=self.step) self._set("_coeffs", _coeffs) self._set("_fitted", True) return _coeffs def _postfit(self, p_features, p_labels, p_censoring, refit, bootstrap, n_samples_bootstrap, confidence_level): # WARNING: _refit uses already preprocessed p_features, p_labels # and p_censoring if not self._fitted: raise RuntimeError('You must fit the model first') if refit: # refit _coeffs on all the data (after Cross Validation for example) self._model_obj.fit(p_features, p_labels, p_censoring) coeffs = self._fit(project=False) self._set('_coeffs', coeffs) if bootstrap: self._model_obj.fit(p_features, p_labels, p_censoring) _refit_coeffs = self._fit(project=True) confidence_intervals = self._bootstrap( p_features, p_labels, p_censoring, _refit_coeffs, n_samples_bootstrap, confidence_level) self._set('confidence_intervals', confidence_intervals._asdict()) return self.coeffs, self.confidence_intervals # Utilities # def _preprocess_data(self, features, labels, censoring): for preprocessor in self._preprocessor_obj: features, labels, censoring = preprocessor.fit_transform( features, labels, censoring) return features, labels, censoring def _bootstrap(self, p_features, p_labels, p_censoring, coeffs, rep, confidence): # WARNING: _bootstrap inputs are already preprocessed p_features, # p_labels and p_censoring # Coeffs here are assumed to be an array (same object than self._coeffs) if confidence <= 0 or confidence >= 1: raise ValueError("`confidence_level` should be in (0, 1)") confidence = 1 - confidence if not self._fitted: raise RuntimeError('You must fit the model first') bootstrap_coeffs = [] sim = SimuSCCS(self.n_cases, self.n_intervals, self.n_features, self.n_lags, coeffs=self._format_coeffs(coeffs)) # TODO later: parallelize bootstrap (everything should be pickable...) for k in range(rep): y = sim._simulate_multinomial_outcomes(p_features, coeffs) self._model_obj.fit(p_features, y, p_censoring) bootstrap_coeffs.append(self._fit(True)) bootstrap_coeffs = np.exp(np.array(bootstrap_coeffs)) bootstrap_coeffs.sort(axis=0) lower_bound = np.log(bootstrap_coeffs[int( np.floor(rep * confidence / 2))]) upper_bound = np.log(bootstrap_coeffs[int( np.floor(rep * (1 - confidence / 2)))]) return Confidence_intervals( self._format_coeffs(coeffs), self._format_coeffs(lower_bound), self._format_coeffs(upper_bound), confidence) def _score(self, features=None, labels=None, censoring=None, preprocess=False): if not self._fitted: raise RuntimeError('You must fit the model first') all_none = all(e is None for e in [features, labels, censoring]) if all_none: loss = self._model_obj.loss(self._coeffs) else: if features is None: raise ValueError('Passed ``features`` is None') elif labels is None: raise ValueError('Passed ``labels`` is None') elif censoring is None: raise ValueError('Passed ``censoring`` is None') else: # Avoid calling preprocessing during CV if preprocess: features, labels, censoring = self._preprocess_data( features, labels, censoring) model = self._construct_model_obj().fit( features, labels, censoring) loss = model.loss(self._coeffs) return loss def _format_coeffs(self, coeffs): value = list() for i, l in enumerate(self.n_lags): start = int(self._features_offset[i]) end = int(start + l + 1) value.append(coeffs[start:end]) return value # Factories # def _construct_preprocessor_obj(self): # TODO later: fix parallel preprocessing preprocessors = list() preprocessors.append(LongitudinalSamplesFilter(n_jobs=1)) if len(self.n_lags) > 0: preprocessors.append( LongitudinalFeaturesLagger(self.n_lags, n_jobs=1)) return preprocessors def _construct_model_obj(self): return ModelSCCS(self.n_intervals, self.n_lags) def _construct_prox_obj(self, coeffs=None, project=False): n_penalized_features = len(self.penalized_features) \ if self.penalized_features is not None else 0 if project: # project future _coeffs on the support of given _coeffs if all(self.n_lags == 0): proxs = [ProxZero()] elif coeffs is not None: prox_ranges = self._detect_support(coeffs) proxs = [ProxEquality(0, range=r) for r in prox_ranges] else: raise ValueError("Coeffs are None. " + "Equality penalty cannot infer the " "coefficients support.") elif n_penalized_features > 0 and self._C_tv is not None or \ self._C_group_l1 is not None: # TV and GroupLasso penalties blocks_start = np.zeros(n_penalized_features) blocks_end = np.zeros(n_penalized_features) proxs = [] for i in self.penalized_features: start = int(self._features_offset[i]) blocks_start[i] = start end = int(blocks_start[i] + self._n_lags[i] + 1) blocks_end[i] = end if self._C_tv is not None: proxs.append(ProxTV(1 / self._C_tv, range=(start, end))) if self._C_group_l1 is not None: blocks_size = blocks_end - blocks_start proxs.append( ProxGroupL1(1 / self._C_group_l1, blocks_start.tolist(), blocks_size.tolist())) else: # Default prox: does nothing proxs = [ProxZero()] prox_obj = ProxMulti(tuple(proxs)) return prox_obj def _detect_support(self, coeffs): """Return the ranges over which consecutive coefficients are equal in case at least two coefficients are equal. This method is used to compute the ranges for ProxEquality, to enforce a support corresponding to the support of `_coeffs`. example: _coeffs = np.array([ 1. 2. 2. 1. 1.]) self._detect_support(_coeffs) >>> [(1, 3), (3, 5)] """ kernel = np.array([1, -1]) groups = [] for i in self.penalized_features: idx = int(self._features_offset[i]) n_lags = int(self._n_lags[i]) if n_lags > 0: acc = 1 for change in np.convolve(coeffs[idx:idx + n_lags + 1], kernel, 'valid'): if change: if acc > 1: groups.append((idx, idx + acc)) idx += acc acc = 1 else: acc += 1 # Last coeff always count as a change if acc > 1: groups.append((idx, idx + acc)) return groups @staticmethod def _construct_solver_obj(step, max_iter, tol, print_every, record_every, verbose, seed): # TODO: we might want to use SAGA also later... (might be faster here) # seed cannot be None in SVRG solver_obj = SVRG(step=step, max_iter=max_iter, tol=tol, print_every=print_every, record_every=record_every, verbose=verbose, seed=seed) return solver_obj @staticmethod def _construct_solver_obj_with_class( step, max_iter, tol, print_every, record_every, verbose, seed, clazz=SVRG): """All creatioon of solver by class type, removes values from constructor parameter list that do not exist on the class construct to be called """ # inspect must be first assign _, _, _, kvs = inspect.getargvalues(inspect.currentframe()) constructor_map = kvs.copy() args = inspect.getfullargspec(clazz.__init__)[0] for k, v in kvs.items(): if k not in args: del constructor_map[k] return SVRG(**constructor_map) def _construct_generator_obj(self, C_tv_range, C_group_l1_range, logspace=True): generators = [] if len(C_tv_range) == 2: if logspace: generators.append(Log10UniformGenerator(*C_tv_range)) else: generators.append(uniform(C_tv_range)) else: generators.append(null_generator) if len(C_group_l1_range) == 2: if logspace: generators.append(Log10UniformGenerator(*C_group_l1_range)) else: generators.append(uniform(C_group_l1_range)) else: generators.append(null_generator) return generators # Properties # @property def step(self): return self._step_size @step.setter def step(self, value): if value > 0: self._set('_step_size', value) self._solver_obj.step = value else: raise ValueError("step should be greater than 0.") @property def C_tv(self): return self._C_tv @C_tv.setter def C_tv(self, value): if value is None or isinstance(value, float) and value > 0: self._set('_C_tv', value) else: raise ValueError('C_tv should be a float greater than zero.') @property def C_group_l1(self): return self._C_group_l1 @C_group_l1.setter def C_group_l1(self, value): if value is None or isinstance(value, float) and value > 0: self._set('_C_group_l1', value) else: raise ValueError('C_group_l1 should be a float greater ' 'than zero.') @property def random_state(self): return self._random_state @random_state.setter def random_state(self, value): np.random.seed(value) self._set('_random_state', value) @property def n_lags(self): return self._n_lags @n_lags.setter def n_lags(self, value): offsets = [0] for l in value: if l < 0: raise ValueError('n_lags elements should be greater than or ' 'equal to 0.') offsets.append(offsets[-1] + l + 1) value = safe_array(value, dtype=np.uint64) self._set('_n_lags', value) self._set('_features_offset', offsets) self._construct_preprocessor_obj() @property def penalized_features(self): return self._penalized_features @penalized_features.setter def penalized_features(self, value): if value is None: value = [True] * len(self.n_lags) self._set('_penalized_features', value) self._construct_preprocessor_obj() @property def coeffs(self): return self._format_coeffs(self._coeffs) @coeffs.setter def coeffs(self, value): raise ValueError('coeffs cannot be set') @property def intensities(self): return [np.exp(c) for c in self.coeffs] @intensities.setter def intensities(self, value): raise ValueError('intensities cannot be set')
# TODO later: put the code below somewhere else? class CrossValidationTracker: def __init__(self, model_params: dict): self.model_params = model_params self.kfold_train_scores = list() self.kfold_mean_train_scores = list() self.kfold_sd_train_scores = list() self.kfold_test_scores = list() self.kfold_mean_test_scores = list() self.kfold_sd_test_scores = list() # TODO later: make this class usable for any parameters # self.parameter_names = parameter_names # self.best_model = { # '_coeffs': list(), # 'confidence_interval': {}, # **{name: list() for name in parameter_names} # } # for field in parameter_names: # setattr(field + '_history', list()) self.C_tv_history = list() self.C_group_l1_history = list() def log_cv_iteration(self, C_tv, C_group_l1, kfold_train_scores, kfold_test_scores): self.kfold_train_scores.append(list(kfold_train_scores)) self.kfold_mean_train_scores.append(kfold_train_scores.mean()) self.kfold_sd_train_scores.append(kfold_train_scores.std()) self.kfold_test_scores.append(list(kfold_test_scores)) self.kfold_mean_test_scores.append(kfold_test_scores.mean()) self.kfold_sd_test_scores.append(kfold_test_scores.std()) self.C_tv_history.append(C_tv) self.C_group_l1_history.append(C_group_l1) def find_best_params(self): # Find best parameters best_idx = int(np.argmin(self.kfold_mean_test_scores)) best_C_tv = self.C_tv_history[best_idx] best_C_group_l1 = self.C_group_l1_history[best_idx] return {'C_tv': best_C_tv, 'C_group_l1': best_C_group_l1} def log_best_model(self, C_tv, C_group_l1, coeffs, score, confidence_interval_dict): self.best_model = { 'C_tv': C_tv, 'C_group_l1': C_group_l1, '_coeffs': list(coeffs), 'score': score, 'confidence_intervals': confidence_interval_dict } def todict(self): return { 'global_model_parameters': list(self.model_params), 'C_tv_history': list(self.C_tv_history), 'C_group_l1_history': list(self.C_group_l1_history), 'kfold_train_scores': list(self.kfold_train_scores), 'kfold_mean_train_scores': list(self.kfold_mean_train_scores), 'kfold_sd_train_scores': list(self.kfold_sd_train_scores), 'kfold_test_scores': list(self.kfold_test_scores), 'kfold_mean_test_scores': list(self.kfold_mean_test_scores), 'kfold_sd_test_scores': list(self.kfold_sd_test_scores), 'best_model': self.best_model } def plot_cv_report(self, elevation=25, azimuth=35): if len(self.C_group_l1_history) > 0 and len(self.C_tv_history) > 0: return self.plot_learning_curves_contour(elevation, azimuth) elif len(self.C_group_l1_history) == 0: return self.plot_learning_curves('Group L1') elif len(self.C_tv_history) == 0: return self.plot_learning_curves('TV') else: raise ValueError("Logged Group L1 and TV penalisation levels " "history are empty.") def plot_learning_curves(self, hyperparameter): if hyperparameter == "TV": C = self.C_tv_history elif hyperparameter == "Group L1": C = self.C_group_l1_history else: raise ValueError("hyperparameter value should be either `TV` or" " `Group L1`") x = np.log10(C) order = np.argsort(x) m = np.array(self.kfold_mean_train_scores)[order] sd = np.array(self.kfold_sd_train_scores)[order] fig = plt.figure() ax = plt.gca() p1 = ax.plot(x[order], m) p2 = ax.fill_between(x[order], m - sd, m + sd, alpha=.3) min_point_train = np.min(m - sd) m = np.array(self.kfold_mean_test_scores)[order] sd = np.array(self.kfold_sd_test_scores)[order] p3 = ax.plot(x[order], m) p4 = ax.fill_between(x[order], m - sd, m + sd, alpha=.3) min_point_test = np.min(m - sd) min_point = min(min_point_train, min_point_test) p5 = plt.scatter(np.log10(C), min_point * np.ones_like(C)) ax.legend([(p1[0], p2), (p3[0], p4), p5], ['train score', 'test score', 'tested hyperparameters'], loc='lower right') ax.set_title('Learning curves') ax.set_xlabel('C %s (log scale)' % hyperparameter) ax.set_ylabel('Loss') return fig, ax def plot_learning_curves_contour(self, elevation=25, azimuth=35): """‘elev’ stores the elevation angle in the z plane. ‘azim’ stores the azimuth angle in the x,y plane.""" sc_train = self.kfold_mean_train_scores sc_test = self.kfold_mean_test_scores X_tile = np.log10(self.C_tv_history) Y_tile = np.log10(self.C_group_l1_history) fig, axarr = plt.subplots(1, 3, figsize=(12, 4), sharey=True, sharex=True) ax = axarr[-1] ax.scatter(X_tile, Y_tile) ax.set_title("Random search tested hyperparameters") ax.set_xlabel('Strength TV') ax.set_ylabel('Strength Group L1') names = ['train', 'test'] cmaps = [cm.Blues, cm.Greens] for i, cv in enumerate([sc_train, sc_test]): Z = np.array(cv) ax = axarr[i] cax = ax.tricontourf(X_tile, Y_tile, Z, 50, cmap=cmaps[i]) ax.set_title(r'Loss (%s)' % names[i]) ax.set_xlabel("TV level (log)") idx = np.where(Z == Z.min()) x, y = (X_tile[idx][0], Y_tile[idx][0]) ax.scatter(x, y, color="red", marker="x") ax.text(x, y, r'%.2f' % Z.min(), color="red", fontsize=12) axarr[0].set_ylabel("Group L1 level (log)") plt.tight_layout() fig2 = plt.figure(figsize=(8, 6.5)) ax = Axes3D(fig2) colors = ['blue', 'green'] names = ['train', 'test'] proxies = [] proxy_names = [] for i, cv in enumerate([sc_train, sc_test]): Z = np.array(cv) ax.plot_trisurf(X_tile, Y_tile, Z, alpha=0.3, color=colors[i]) proxies.append(plt.Rectangle((0, 0), 1, 1, fc=colors[i], alpha=.3)) proxy_names.append("%s score" % names[i]) Z = np.array(sc_test) best_params = self.find_best_params() x = np.log10(best_params['C_tv']) y = np.log10(best_params['C_group_l1']) idx = np.where(X_tile == x) # should be equal to np.where(Y_tile == y) z = Z[idx] p1 = ax.scatter(x, y, z, c='red') proxies.append(p1) proxy_names.append('CV best score') ax.set_xlabel("TV level (log)") ax.set_ylabel("Group L1 level (log)") ax.set_title("Learning surfaces") ax.set_zlabel("loss") ax.view_init(elevation, azimuth) plt.legend(proxies, proxy_names, loc='best') return fig, axarr, fig2, ax # Generators for random search # generator which generates nothing DumbGenerator = namedtuple('DumbGenerator', ['rvs']) null_generator = DumbGenerator(rvs=lambda x: [None]) class Log10UniformGenerator: """Generate uniformly distributed points in the log10 space.""" def __init__(self, min_val, max_val): self.min_val = min_val self.max_val = max_val self.gen = uniform(0, 1) def rvs(self, n): return 10 ** ( self.min_val + (self.max_val - self.min_val) * self.gen.rvs(n))