Source code for tick.survival.survival

# License: BSD 3 clause


[docs]def kaplan_meier(timestamps, event_observed): """Computes the Kaplan-Meier survival function estimation given by: .. math:: S{(t)} = \\prod_{i:t_{(i)}<t}^n \\left(1 - \\frac{d_i}{n_i} \\right) where - :math:`d_i` are the number of deaths at :math:`t_{i}` - :math:`n_i` are the number of patients alive just before :math:`t_{i}` Parameters ---------- timestamps : `numpy.array` Timestamps for each observation event_observed : `numpy.array` Bool array denoting if the death event was observed or not Returns ------- output : `numpy.array` The computed Kaplan-Meier survival function estimation """ import numpy as np if isinstance(timestamps, list): timestamps = np.array(timestamps) if isinstance(event_observed, list): event_observed = np.array(event_observed) timestamps_observed = timestamps[event_observed == 1] unique_timestamps_observed = np.concatenate( (np.zeros(1), np.unique(timestamps_observed))) return np.cumprod( np.fromiter( (1.0 - np.sum(t == timestamps_observed) / np.sum(t <= timestamps) for t in unique_timestamps_observed), dtype='float', count=unique_timestamps_observed.size))
[docs]def nelson_aalen(timestamps, event_observed): """Computes the Nelson-Aalen cumulative hazard rate estimation given by: .. math:: \\Lambda{(t_{i})} = \\sum_{j=1}^i \\frac{d_j}{n_j} where - :math:`d_j` are the number of deaths at :math:`t_{j}` - :math:`n_j` are the number of patients alive just before :math:`t_{j}` Parameters ---------- timestamps : `numpy.array` Timestamps for each observation event_observed : `numpy.array` Bool array denoting if the death event was observed or not Returns ------- output : `numpy.array` The computed Nelson-Aalen cumulative hazard rate """ import numpy as np if isinstance(timestamps, list): timestamps = np.array(timestamps) if isinstance(event_observed, list): event_observed = np.array(event_observed) timestamps_observed = timestamps[event_observed == 1] unique_timestamps_observed = np.concatenate( (np.zeros(1), np.unique(timestamps_observed))) return np.cumsum( np.fromiter((np.sum(t == timestamps_observed) / np.sum(t <= timestamps) for t in unique_timestamps_observed), dtype='float', count=unique_timestamps_observed.size))