# License: BSD 3 clause
import numpy as np
from tick.base.simulation import SimuWithFeatures
from tick.preprocessing.features_binarizer import FeaturesBinarizer
# TODO: something better to tune the censoring level than this censoring factor
[docs]class SimuCoxReg(SimuWithFeatures):
"""Simulation of a Cox regression for proportional hazards
Parameters
----------
coeffs : `numpy.ndarray`, shape=(n_coeffs,)
The array of coefficients of the model
features : `numpy.ndarray`, shape=(n_samples, n_features), default=`None`
The features matrix to use. If None, it is simulated
n_samples : `int`, default=200
Number of samples
times_distribution : `str`, default="weibull"
The distrubution of times. Only ``"weibull"``
is implemented for now
scale : `float`, default=1.0
Scaling parameter to use in the distribution of times
shape : `float`, default=1.0
Shape parameter to use in the distribution of times
censoring_factor : `float`, default=2.0
Level of censoring. Increasing censoring_factor leads
to less censored times and conversely.
features_type : `str`, default="cov_toeplitz"
The type of features matrix to simulate
* If ``"cov_toeplitz"`` : a Gaussian distribution with
Toeplitz correlation matrix
* If ``"cov_uniform"`` : a Gaussian distribution with
correlation matrix given by O.5 * (U + U.T), where U is
uniform on [0, 1] and diagonal filled with ones.
cov_corr : `float`, default=0.5
Correlation to use in the Toeplitz correlation matrix
features_scaling : `str`, default="none"
The way the features matrix is scaled after simulation
* If ``"standard"`` : the columns are centered and
normalized
* If ``"min-max"`` : remove the minimum and divide by
max-min
* If ``"norm"`` : the columns are normalized but not centered
* If ``"none"`` : nothing is done to the features
seed : `int`, default=None
The seed of the random number generator. If `None` it is not
seeded
verbose : `bool`, default=True
If True, print things
Attributes
----------
features : `numpy.ndarray`, shape=(n_samples, n_features)
The simulated (or given) features matrix
times : `numpy.ndarray`, shape=(n_samples,)
Simulated times
censoring : `numpy.ndarray`, shape=(n_samples,)
Simulated censoring indicator, where ``censoring[i] == 1``
indicates that the time of the i-th individual is a failure
time, and where ``censoring[i] == 0`` means that the time of
the i-th individual is a censoring time
time_start : `str`
Start date of the simulation
time_elapsed : `int`
Duration of the simulation, in seconds
time_end : `str`
End date of the simulation
dtype : `{'float64', 'float32'}`, default='float64'
Type of the generated arrays.
Used in the case features is None
Notes
-----
There is no intercept in this model
"""
_attrinfos = {
"times": {
"writable": False
},
"censoring": {
"writable": False
},
"_times_distribution": {
"writable": False
},
"_scale": {
"writable": False
},
"_shape": {
"writable": False
}
}
[docs] def __init__(self, coeffs: np.ndarray,
features: np.ndarray = None, n_samples: int = 200,
times_distribution: str = "weibull",
shape: float = 1., scale: float = 1.,
censoring_factor: float = 2.,
features_type: str = "cov_toeplitz",
cov_corr: float = 0.5, features_scaling: str = "none",
seed: int = None, verbose: bool = True, dtype="float64"):
n_features = coeffs.shape[0]
# intercept=None in this model
SimuWithFeatures.__init__(self, None, features, n_samples,
n_features, features_type, cov_corr,
features_scaling, seed, verbose, dtype=dtype)
self.coeffs = coeffs
self.shape = shape
self.scale = scale
self.censoring_factor = censoring_factor
self.times_distribution = times_distribution
self.features = None
self.times = None
self.censoring = None
[docs] def simulate(self):
"""Launch simulation of the data
Returns
-------
features : `numpy.ndarray`, shape=(n_samples, n_features)
The simulated (or given) features matrix
times : `numpy.ndarray`, shape=(n_samples,)
Simulated times
censoring : `numpy.ndarray`, shape=(n_samples,)
Simulated censoring indicator, where ``censoring[i] == 1``
indicates that the time of the i-th individual is a failure
time, and where ``censoring[i] == 0`` means that the time of
the i-th individual is a censoring time
"""
return SimuWithFeatures.simulate(self)
@property
def times_distribution(self):
return self._times_distribution
@times_distribution.setter
def times_distribution(self, val):
if val != "weibull":
raise ValueError("``times_distribution`` was not "
"understood, try using 'weibull' instead")
self._set("_times_distribution", val)
@property
def shape(self):
return self._shape
@shape.setter
def shape(self, val):
if val <= 0:
raise ValueError("``shape`` must be strictly positive")
self._set("_shape", val)
@property
def scale(self):
return self._scale
@scale.setter
def scale(self, val):
if val <= 0:
raise ValueError("``scale`` must be strictly positive")
self._set("_scale", val)
def _simulate(self):
# The features matrix already exists, and is created by the
# super class
features = self.features
n_samples, n_features = features.shape
u = features.dot(self.coeffs)
# Simulation of true times
E = np.random.exponential(scale=1., size=n_samples)
E *= np.exp(-u)
scale = self.scale
shape = self.shape
if self.times_distribution == "weibull":
T = 1. / scale * E ** (1. / shape)
else:
# There is not point in this test, but let's do it like that
# since we're likely to implement other distributions
T = 1. / scale * E ** (1. / shape)
m = T.mean()
# Simulation of the censoring
c = self.censoring_factor
C = np.random.exponential(scale=c * m, size=n_samples)
# Observed time
self._set("times", np.minimum(T, C).astype(self.dtype))
# Censoring indicator: 1 if it is a time of failure, 0 if it's
# censoring. It is as int8 and not bool as we might need to
# construct a memory access on it later
censoring = (T <= C).astype(np.ushort)
self._set("censoring", censoring)
return self.features, self.times, self.censoring
def _as_dict(self):
dd = SimuWithFeatures._as_dict(self)
dd.pop("features", None)
dd.pop("times", None)
dd.pop("censoring", None)
return dd
class SimuCoxRegWithCutPoints(SimuWithFeatures):
"""Simulation of a Cox regression for proportional hazards with cut-points
effects in the features
Parameters
----------
features : `numpy.ndarray`, shape=(n_samples, n_features), default=`None`
The features matrix to use. If None, it is simulated
n_samples : `int`, default=200
Number of samples
n_features : `int`, default=5
Number of features
times_distribution : `str`, default="weibull"
The distrubution of times. Only ``"weibull"``
is implemented for now
scale : `float`, default=1.0
Scaling parameter to use in the distribution of times
shape : `float`, default=1.0
Shape parameter to use in the distribution of times
censoring_factor : `float`, default=2.0
Level of censoring. Increasing censoring_factor leads
to less censored times and conversely.
features_type : `str`, default="cov_toeplitz"
The type of features matrix to simulate
* If ``"cov_toeplitz"`` : a Gaussian distribution with
Toeplitz correlation matrix
* If ``"cov_uniform"`` : a Gaussian distribution with
correlation matrix given by O.5 * (U + U.T), where U is
uniform on [0, 1] and diagonal filled with ones.
cov_corr : `float`, default=0.5
Correlation to use in the Toeplitz correlation matrix
features_scaling : `str`, default="none"
The way the features matrix is scaled after simulation
* If ``"standard"`` : the columns are centered and
normalized
* If ``"min-max"`` : remove the minimum and divide by
max-min
* If ``"norm"`` : the columns are normalized but not centered
* If ``"none"`` : nothing is done to the features
seed : `int`, default=None
The seed of the random number generator. If `None` it is not
seeded
verbose : `bool`, default=True
If True, print things
n_cut_points : `int`, default="none"
Number of cut-points generated per feature. If `None` it is sampled from
a geometric distribution of parameter n_cut_points_factor.
n_cut_points_factor : `float`, default=0.7
Parameter of the geometric distribution used to generate the number of
cut-points when n_cut_points is `None`. Increasing n_cut_points_factor
leads to less cut-points per feature on average.
sparsity : `float`, default=0
Percentage of block sparsity induced in the coefficient vector. Must be
in [0, 1].
Attributes
----------
features : `numpy.ndarray`, shape=(n_samples, n_features)
The simulated (or given) features matrix
times : `numpy.ndarray`, shape=(n_samples,)
Simulated times
censoring : `numpy.ndarray`, shape=(n_samples,)
Simulated censoring indicator, where ``censoring[i] == 1``
indicates that the time of the i-th individual is a failure
time, and where ``censoring[i] == 0`` means that the time of
the i-th individual is a censoring time
Notes
-----
There is no intercept in this model
"""
_attrinfos = {
"times": {
"writable": False
},
"censoring": {
"writable": False
},
"_times_distribution": {
"writable": False
},
"_scale": {
"writable": False
},
"_shape": {
"writable": False
},
"_sparsity": {
"writable": False
}
}
def __init__(self, features: np.ndarray = None, n_samples: int = 200,
n_features: int = 5, n_cut_points: int = None,
n_cut_points_factor: float = .7,
times_distribution: str = "weibull",
shape: float = 1., scale: float = 1.,
censoring_factor: float = 2.,
features_type: str = "cov_toeplitz",
cov_corr: float = 0.5, features_scaling: str = "none",
seed: int = None, verbose: bool = True, sparsity=0):
# intercept=None in this model
SimuWithFeatures.__init__(self, None, features, n_samples,
n_features, features_type, cov_corr,
features_scaling, seed, verbose)
self.shape = shape
self.scale = scale
self.censoring_factor = censoring_factor
self.times_distribution = times_distribution
self.n_cut_points = n_cut_points
self.n_cut_points_factor = n_cut_points_factor
self.sparsity = sparsity
self.features = None
self.times = None
self.censoring = None
def simulate(self):
"""Launch simulation of the data
Returns
-------
features : `numpy.ndarray`, shape=(n_samples, n_features)
The simulated (or given) features matrix
times : `numpy.ndarray`, shape=(n_samples,)
Simulated times
censoring : `numpy.ndarray`, shape=(n_samples,)
Simulated censoring indicator, where ``censoring[i] == 1``
indicates that the time of the i-th individual is a failure
time, and where ``censoring[i] == 0`` means that the time of
the i-th individual is a censoring time
"""
return SimuWithFeatures.simulate(self)
@property
def times_distribution(self):
return self._times_distribution
@times_distribution.setter
def times_distribution(self, val):
if val != "weibull":
raise ValueError("``times_distribution`` was not "
"understood, try using 'weibull' instead")
self._set("_times_distribution", val)
@property
def shape(self):
return self._shape
@shape.setter
def shape(self, val):
if val <= 0:
raise ValueError("``shape`` must be strictly positive")
self._set("_shape", val)
@property
def scale(self):
return self._scale
@scale.setter
def scale(self, val):
if val <= 0:
raise ValueError("``scale`` must be strictly positive")
self._set("_scale", val)
@property
def sparsity(self):
return self._sparsity
@sparsity.setter
def sparsity(self, val):
if not 0 <= val <= 1:
raise ValueError("``sparsity`` must be in (0, 1)")
self._set("_sparsity", val)
def _simulate(self):
# The features matrix already exists, and is created by the
# super class
features = self.features
n_samples, n_features = features.shape
# Simulation of cut-points
n_cut_points = self.n_cut_points
n_cut_points_factor = self.n_cut_points_factor
sparsity = self.sparsity
s = round(n_features * sparsity)
# sparsity index set
S = np.random.choice(n_features, s, replace=False)
if n_cut_points is None:
n_cut_points = np.random.geometric(n_cut_points_factor, n_features)
else:
n_cut_points = np.repeat(n_cut_points, n_features)
cut_points = {}
coeffs_binarized = np.array([])
for j in range(n_features):
feature_j = features[:, j]
quantile_cuts = np.linspace(10, 90, 10)
candidates = np.percentile(feature_j, quantile_cuts,
interpolation="nearest")
cut_points_j = np.random.choice(candidates, n_cut_points[j],
replace=False)
cut_points_j = np.sort(cut_points_j)
cut_points_j = np.insert(cut_points_j, 0, -np.inf)
cut_points_j = np.append(cut_points_j, np.inf)
cut_points[str(j)] = cut_points_j
# generate beta star
if j in S:
coeffs_block = np.zeros(n_cut_points[j] + 1)
else:
coeffs_block = np.random.normal(1, .5, n_cut_points[j] + 1)
# make sure 2 consecutive coeffs are different enough
coeffs_block = np.abs(coeffs_block)
coeffs_block[::2] *= -1
# sum-to-zero constraint in each block
coeffs_block = coeffs_block - coeffs_block.mean()
coeffs_binarized = np.append(coeffs_binarized, coeffs_block)
binarizer = FeaturesBinarizer(method='given',
bins_boundaries=cut_points)
binarized_features = binarizer.fit_transform(features)
u = binarized_features.dot(coeffs_binarized)
# Simulation of true times
E = np.random.exponential(scale=1., size=n_samples)
E *= np.exp(-u)
scale = self.scale
shape = self.shape
if self.times_distribution == "weibull":
T = 1. / scale * E ** (1. / shape)
else:
# There is not point in this test, but let's do it like that
# since we're likely to implement other distributions
T = 1. / scale * E ** (1. / shape)
m = T.mean()
# Simulation of the censoring
c = self.censoring_factor
C = np.random.exponential(scale=c * m, size=n_samples)
# Observed time
self._set("times", np.minimum(T, C).astype(self.dtype))
# Censoring indicator: 1 if it is a time of failure, 0 if censoring.
censoring = (T <= C).astype(np.ushort)
self._set("censoring", censoring)
return self.features, self.times, self.censoring, cut_points, \
coeffs_binarized, S
def _as_dict(self):
dd = SimuWithFeatures._as_dict(self)
dd.pop("features", None)
dd.pop("times", None)
dd.pop("censoring", None)
return dd