survivalist.util
1# This program is free software: you can redistribute it and/or modify 2# it under the terms of the GNU General Public License as published by 3# the Free Software Foundation, either version 3 of the License, or 4# (at your option) any later version. 5# 6# This program is distributed in the hope that it will be useful, 7# but WITHOUT ANY WARRANTY; without even the implied warranty of 8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 9# GNU General Public License for more details. 10# 11# You should have received a copy of the GNU General Public License 12# along with this program. If not, see <http://www.gnu.org/licenses/>. 13import numpy as np 14import pandas as pd 15from pandas.api.types import CategoricalDtype 16from sklearn.utils import check_array, check_consistent_length 17 18__all__ = ["check_array_survival", "check_y_survival", "safe_concat", "Surv"] 19 20 21class Surv: 22 """ 23 Helper class to construct structured array of event indicator and observed time. 24 """ 25 26 @staticmethod 27 def from_arrays(event, time, name_event=None, name_time=None): 28 """Create structured array. 29 30 Parameters 31 ---------- 32 event : array-like 33 Event indicator. A boolean array or array with values 0/1. 34 time : array-like 35 Observed time. 36 name_event : str|None 37 Name of event, optional, default: 'event' 38 name_time : str|None 39 Name of observed time, optional, default: 'time' 40 41 Returns 42 ------- 43 y : np.array 44 Structured array with two fields. 45 """ 46 name_event = name_event or "event" 47 name_time = name_time or "time" 48 if name_time == name_event: 49 raise ValueError("name_time must be different from name_event") 50 51 time = np.asanyarray(time, dtype=float) 52 y = np.empty(time.shape[0], dtype=[ 53 (name_event, bool), (name_time, float)]) 54 y[name_time] = time 55 56 event = np.asanyarray(event) 57 check_consistent_length(time, event) 58 59 if np.issubdtype(event.dtype, np.bool_): 60 y[name_event] = event 61 else: 62 events = np.unique(event) 63 events.sort() 64 if len(events) != 2: 65 raise ValueError("event indicator must be binary") 66 67 if np.all(events == np.array([0, 1], dtype=events.dtype)): 68 y[name_event] = event.astype(bool) 69 else: 70 raise ValueError( 71 "non-boolean event indicator must contain 0 and 1 only") 72 73 return y 74 75 @staticmethod 76 def from_dataframe(event, time, data): 77 """Create structured array from data frame. 78 79 Parameters 80 ---------- 81 event : object 82 Identifier of column containing event indicator. 83 time : object 84 Identifier of column containing time. 85 data : pandas.DataFrame 86 Dataset. 87 88 Returns 89 ------- 90 y : np.array 91 Structured array with two fields. 92 """ 93 if not isinstance(data, pd.DataFrame): 94 raise TypeError( 95 f"expected pandas.DataFrame, but got {type(data)!r}") 96 97 return Surv.from_arrays( 98 data.loc[:, event].values, 99 data.loc[:, time].values, 100 name_event=str(event), 101 name_time=str(time), 102 ) 103 104 105def check_y_survival(y_or_event, *args, allow_all_censored=False, allow_time_zero=True): 106 """Check that array correctly represents an outcome for survival analysis. 107 108 Parameters 109 ---------- 110 y_or_event : structured array with two fields, or boolean array 111 If a structured array, it must contain the binary event indicator 112 as first field, and time of event or time of censoring as 113 second field. Otherwise, it is assumed that a boolean array 114 representing the event indicator is passed. 115 116 *args : list of array-likes 117 Any number of array-like objects representing time information. 118 Elements that are `None` are passed along in the return value. 119 120 allow_all_censored : bool, optional, default: False 121 Whether to allow all events to be censored. 122 123 allow_time_zero : bool, optional, default: True 124 Whether to allow event times to be zero. 125 126 Returns 127 ------- 128 event : array, shape=[n_samples,], dtype=bool 129 Binary event indicator. 130 131 time : array, shape=[n_samples,], dtype=float 132 Time of event or censoring. 133 """ 134 if len(args) == 0: 135 y = y_or_event 136 137 if not isinstance(y, np.ndarray) or y.dtype.fields is None or len(y.dtype.fields) != 2: 138 raise ValueError( 139 "y must be a structured array with the first field" 140 " being a binary class event indicator and the second field" 141 " the time of the event/censoring" 142 ) 143 144 event_field, time_field = y.dtype.names 145 y_event = y[event_field] 146 time_args = (y[time_field],) 147 else: 148 y_event = np.asanyarray(y_or_event) 149 time_args = args 150 151 event = check_array(y_event, ensure_2d=False) 152 if not np.issubdtype(event.dtype, np.bool_): 153 raise ValueError( 154 f"elements of event indicator must be boolean, but found {event.dtype}") 155 156 if not (allow_all_censored or np.any(event)): 157 raise ValueError("all samples are censored") 158 159 return_val = [event] 160 for i, yt in enumerate(time_args): 161 if yt is None: 162 return_val.append(yt) 163 continue 164 165 yt = check_array(yt, ensure_2d=False) 166 if not np.issubdtype(yt.dtype, np.number): 167 raise ValueError( 168 f"time must be numeric, but found {yt.dtype} for argument {i + 2}") 169 170 if allow_time_zero: 171 cond = yt < 0 172 msg = "observed time contains values smaller zero" 173 else: 174 cond = yt <= 0 175 msg = "observed time contains values smaller or equal to zero" 176 if np.any(cond): 177 raise ValueError(msg) 178 179 return_val.append(yt) 180 181 return tuple(return_val) 182 183 184def check_array_survival(X, y, **kwargs): 185 """Check that all arrays have consistent first dimensions. 186 187 Parameters 188 ---------- 189 X : array-like 190 Data matrix containing feature vectors. 191 192 y : structured array with two fields 193 A structured array containing the binary event indicator 194 as first field, and time of event or time of censoring as 195 second field. 196 197 kwargs : dict 198 Additional arguments passed to :func:`check_y_survival`. 199 200 Returns 201 ------- 202 event : array, shape=[n_samples,], dtype=bool 203 Binary event indicator. 204 205 time : array, shape=[n_samples,], dtype=float 206 Time of event or censoring. 207 """ 208 event, time = check_y_survival(y, **kwargs) 209 check_consistent_length(X, event, time) 210 return event, time 211 212 213def safe_concat(objs, *args, **kwargs): 214 """Alternative to :func:`pandas.concat` that preserves categorical variables. 215 216 Parameters 217 ---------- 218 objs : a sequence or mapping of Series, DataFrame, or Panel objects 219 If a dict is passed, the sorted keys will be used as the `keys` 220 argument, unless it is passed, in which case the values will be 221 selected (see below). Any None objects will be dropped silently unless 222 they are all None in which case a ValueError will be raised 223 axis : {0, 1, ...}, default 0 224 The axis to concatenate along 225 join : {'inner', 'outer'}, default 'outer' 226 How to handle indexes on other axis(es) 227 join_axes : list of Index objects 228 Specific indexes to use for the other n - 1 axes instead of performing 229 inner/outer set logic 230 verify_integrity : boolean, default False 231 Check whether the new concatenated axis contains duplicates. This can 232 be very expensive relative to the actual data concatenation 233 keys : sequence, default None 234 If multiple levels passed, should contain tuples. Construct 235 hierarchical index using the passed keys as the outermost level 236 levels : list of sequences, default None 237 Specific levels (unique values) to use for constructing a 238 MultiIndex. Otherwise they will be inferred from the keys 239 names : list, default None 240 Names for the levels in the resulting hierarchical index 241 ignore_index : boolean, default False 242 If True, do not use the index values along the concatenation axis. The 243 resulting axis will be labeled 0, ..., n - 1. This is useful if you are 244 concatenating objects where the concatenation axis does not have 245 meaningful indexing information. Note the the index values on the other 246 axes are still respected in the join. 247 copy : boolean, default True 248 If False, do not copy data unnecessarily 249 250 Notes 251 ----- 252 The keys, levels, and names arguments are all optional 253 254 Returns 255 ------- 256 concatenated : type of objects 257 """ 258 axis = kwargs.pop("axis", 0) 259 categories = {} 260 for df in objs: 261 if isinstance(df, pd.Series): 262 if isinstance(df.dtype, CategoricalDtype): 263 categories[df.name] = { 264 "categories": df.cat.categories, 265 "ordered": df.cat.ordered, 266 } 267 else: 268 dfc = df.select_dtypes(include=["category"]) 269 for name, s in dfc.items(): 270 if name in categories: 271 if axis == 1: 272 raise ValueError(f"duplicate columns {name}") 273 if not categories[name]["categories"].equals(s.cat.categories): 274 raise ValueError( 275 f"categories for column {name} do not match") 276 else: 277 categories[name] = { 278 "categories": s.cat.categories, 279 "ordered": s.cat.ordered, 280 } 281 df[name] = df[name].astype(object) 282 283 concatenated = pd.concat(objs, *args, axis=axis, **kwargs) 284 285 for name, params in categories.items(): 286 concatenated[name] = pd.Categorical(concatenated[name], **params) 287 288 return concatenated 289 290 291class _PropertyAvailableIfDescriptor: 292 """Implements a conditional property using the descriptor protocol based on the property decorator. 293 294 The corresponding class in scikit-learn (`_AvailableIfDescriptor`) only supports callables. 295 This class adopts the property decorator as described in the descriptor guide in the offical Python documentation. 296 297 See also 298 -------- 299 https://docs.python.org/3/howto/descriptor.html 300 Descriptor HowTo Guide 301 302 :class:`sklearn.utils.available_if._AvailableIfDescriptor` 303 The original class in scikit-learn. 304 """ 305 306 def __init__(self, check, fget, doc=None): 307 self.check = check 308 self.fget = fget 309 if doc is None and fget is not None: 310 doc = fget.__doc__ 311 self.__doc__ = doc 312 self._name = "" 313 314 def __set_name__(self, owner, name): 315 self._name = name 316 317 def __get__(self, obj, objtype=None): 318 if obj is None: 319 return self 320 321 attr_err = AttributeError( 322 f"This {obj!r} has no attribute {self._name!r}") 323 if not self.check(obj): 324 raise attr_err 325 326 if self.fget is None: 327 raise AttributeError(f"property '{self._name}' has no getter") 328 return self.fget(obj) 329 330 331def property_available_if(check): 332 """A property attribute that is available only if check returns a truthy value. 333 334 Only supports getting an attribute value, setting or deleting an attribute value are not supported. 335 336 Parameters 337 ---------- 338 check : callable 339 When passed the object of the decorated method, this should return 340 `True` if the property attribute is available, and either return `False` 341 or raise an `AttributeError` if not available. 342 343 Returns 344 ------- 345 callable 346 Callable makes the decorated property available if `check` returns 347 `True`, otherwise the decorated property is unavailable. 348 """ 349 return lambda fn: _PropertyAvailableIfDescriptor(check=check, fget=fn)
185def check_array_survival(X, y, **kwargs): 186 """Check that all arrays have consistent first dimensions. 187 188 Parameters 189 ---------- 190 X : array-like 191 Data matrix containing feature vectors. 192 193 y : structured array with two fields 194 A structured array containing the binary event indicator 195 as first field, and time of event or time of censoring as 196 second field. 197 198 kwargs : dict 199 Additional arguments passed to :func:`check_y_survival`. 200 201 Returns 202 ------- 203 event : array, shape=[n_samples,], dtype=bool 204 Binary event indicator. 205 206 time : array, shape=[n_samples,], dtype=float 207 Time of event or censoring. 208 """ 209 event, time = check_y_survival(y, **kwargs) 210 check_consistent_length(X, event, time) 211 return event, time
Check that all arrays have consistent first dimensions.
Parameters
X : array-like Data matrix containing feature vectors.
y : structured array with two fields A structured array containing the binary event indicator as first field, and time of event or time of censoring as second field.
kwargs : dict
Additional arguments passed to check_y_survival().
Returns
event : array, shape=[n_samples,], dtype=bool Binary event indicator.
time : array, shape=[n_samples,], dtype=float Time of event or censoring.
106def check_y_survival(y_or_event, *args, allow_all_censored=False, allow_time_zero=True): 107 """Check that array correctly represents an outcome for survival analysis. 108 109 Parameters 110 ---------- 111 y_or_event : structured array with two fields, or boolean array 112 If a structured array, it must contain the binary event indicator 113 as first field, and time of event or time of censoring as 114 second field. Otherwise, it is assumed that a boolean array 115 representing the event indicator is passed. 116 117 *args : list of array-likes 118 Any number of array-like objects representing time information. 119 Elements that are `None` are passed along in the return value. 120 121 allow_all_censored : bool, optional, default: False 122 Whether to allow all events to be censored. 123 124 allow_time_zero : bool, optional, default: True 125 Whether to allow event times to be zero. 126 127 Returns 128 ------- 129 event : array, shape=[n_samples,], dtype=bool 130 Binary event indicator. 131 132 time : array, shape=[n_samples,], dtype=float 133 Time of event or censoring. 134 """ 135 if len(args) == 0: 136 y = y_or_event 137 138 if not isinstance(y, np.ndarray) or y.dtype.fields is None or len(y.dtype.fields) != 2: 139 raise ValueError( 140 "y must be a structured array with the first field" 141 " being a binary class event indicator and the second field" 142 " the time of the event/censoring" 143 ) 144 145 event_field, time_field = y.dtype.names 146 y_event = y[event_field] 147 time_args = (y[time_field],) 148 else: 149 y_event = np.asanyarray(y_or_event) 150 time_args = args 151 152 event = check_array(y_event, ensure_2d=False) 153 if not np.issubdtype(event.dtype, np.bool_): 154 raise ValueError( 155 f"elements of event indicator must be boolean, but found {event.dtype}") 156 157 if not (allow_all_censored or np.any(event)): 158 raise ValueError("all samples are censored") 159 160 return_val = [event] 161 for i, yt in enumerate(time_args): 162 if yt is None: 163 return_val.append(yt) 164 continue 165 166 yt = check_array(yt, ensure_2d=False) 167 if not np.issubdtype(yt.dtype, np.number): 168 raise ValueError( 169 f"time must be numeric, but found {yt.dtype} for argument {i + 2}") 170 171 if allow_time_zero: 172 cond = yt < 0 173 msg = "observed time contains values smaller zero" 174 else: 175 cond = yt <= 0 176 msg = "observed time contains values smaller or equal to zero" 177 if np.any(cond): 178 raise ValueError(msg) 179 180 return_val.append(yt) 181 182 return tuple(return_val)
Check that array correctly represents an outcome for survival analysis.
Parameters
y_or_event : structured array with two fields, or boolean array If a structured array, it must contain the binary event indicator as first field, and time of event or time of censoring as second field. Otherwise, it is assumed that a boolean array representing the event indicator is passed.
*args : list of array-likes
Any number of array-like objects representing time information.
Elements that are None are passed along in the return value.
allow_all_censored : bool, optional, default: False Whether to allow all events to be censored.
allow_time_zero : bool, optional, default: True Whether to allow event times to be zero.
Returns
event : array, shape=[n_samples,], dtype=bool Binary event indicator.
time : array, shape=[n_samples,], dtype=float Time of event or censoring.
214def safe_concat(objs, *args, **kwargs): 215 """Alternative to :func:`pandas.concat` that preserves categorical variables. 216 217 Parameters 218 ---------- 219 objs : a sequence or mapping of Series, DataFrame, or Panel objects 220 If a dict is passed, the sorted keys will be used as the `keys` 221 argument, unless it is passed, in which case the values will be 222 selected (see below). Any None objects will be dropped silently unless 223 they are all None in which case a ValueError will be raised 224 axis : {0, 1, ...}, default 0 225 The axis to concatenate along 226 join : {'inner', 'outer'}, default 'outer' 227 How to handle indexes on other axis(es) 228 join_axes : list of Index objects 229 Specific indexes to use for the other n - 1 axes instead of performing 230 inner/outer set logic 231 verify_integrity : boolean, default False 232 Check whether the new concatenated axis contains duplicates. This can 233 be very expensive relative to the actual data concatenation 234 keys : sequence, default None 235 If multiple levels passed, should contain tuples. Construct 236 hierarchical index using the passed keys as the outermost level 237 levels : list of sequences, default None 238 Specific levels (unique values) to use for constructing a 239 MultiIndex. Otherwise they will be inferred from the keys 240 names : list, default None 241 Names for the levels in the resulting hierarchical index 242 ignore_index : boolean, default False 243 If True, do not use the index values along the concatenation axis. The 244 resulting axis will be labeled 0, ..., n - 1. This is useful if you are 245 concatenating objects where the concatenation axis does not have 246 meaningful indexing information. Note the the index values on the other 247 axes are still respected in the join. 248 copy : boolean, default True 249 If False, do not copy data unnecessarily 250 251 Notes 252 ----- 253 The keys, levels, and names arguments are all optional 254 255 Returns 256 ------- 257 concatenated : type of objects 258 """ 259 axis = kwargs.pop("axis", 0) 260 categories = {} 261 for df in objs: 262 if isinstance(df, pd.Series): 263 if isinstance(df.dtype, CategoricalDtype): 264 categories[df.name] = { 265 "categories": df.cat.categories, 266 "ordered": df.cat.ordered, 267 } 268 else: 269 dfc = df.select_dtypes(include=["category"]) 270 for name, s in dfc.items(): 271 if name in categories: 272 if axis == 1: 273 raise ValueError(f"duplicate columns {name}") 274 if not categories[name]["categories"].equals(s.cat.categories): 275 raise ValueError( 276 f"categories for column {name} do not match") 277 else: 278 categories[name] = { 279 "categories": s.cat.categories, 280 "ordered": s.cat.ordered, 281 } 282 df[name] = df[name].astype(object) 283 284 concatenated = pd.concat(objs, *args, axis=axis, **kwargs) 285 286 for name, params in categories.items(): 287 concatenated[name] = pd.Categorical(concatenated[name], **params) 288 289 return concatenated
Alternative to pandas.concat() that preserves categorical variables.
Parameters
objs : a sequence or mapping of Series, DataFrame, or Panel objects
If a dict is passed, the sorted keys will be used as the keys
argument, unless it is passed, in which case the values will be
selected (see below). Any None objects will be dropped silently unless
they are all None in which case a ValueError will be raised
axis : {0, 1, ...}, default 0
The axis to concatenate along
join : {'inner', 'outer'}, default 'outer'
How to handle indexes on other axis(es)
join_axes : list of Index objects
Specific indexes to use for the other n - 1 axes instead of performing
inner/outer set logic
verify_integrity : boolean, default False
Check whether the new concatenated axis contains duplicates. This can
be very expensive relative to the actual data concatenation
keys : sequence, default None
If multiple levels passed, should contain tuples. Construct
hierarchical index using the passed keys as the outermost level
levels : list of sequences, default None
Specific levels (unique values) to use for constructing a
MultiIndex. Otherwise they will be inferred from the keys
names : list, default None
Names for the levels in the resulting hierarchical index
ignore_index : boolean, default False
If True, do not use the index values along the concatenation axis. The
resulting axis will be labeled 0, ..., n - 1. This is useful if you are
concatenating objects where the concatenation axis does not have
meaningful indexing information. Note the the index values on the other
axes are still respected in the join.
copy : boolean, default True
If False, do not copy data unnecessarily
Notes
The keys, levels, and names arguments are all optional
Returns
concatenated : type of objects
22class Surv: 23 """ 24 Helper class to construct structured array of event indicator and observed time. 25 """ 26 27 @staticmethod 28 def from_arrays(event, time, name_event=None, name_time=None): 29 """Create structured array. 30 31 Parameters 32 ---------- 33 event : array-like 34 Event indicator. A boolean array or array with values 0/1. 35 time : array-like 36 Observed time. 37 name_event : str|None 38 Name of event, optional, default: 'event' 39 name_time : str|None 40 Name of observed time, optional, default: 'time' 41 42 Returns 43 ------- 44 y : np.array 45 Structured array with two fields. 46 """ 47 name_event = name_event or "event" 48 name_time = name_time or "time" 49 if name_time == name_event: 50 raise ValueError("name_time must be different from name_event") 51 52 time = np.asanyarray(time, dtype=float) 53 y = np.empty(time.shape[0], dtype=[ 54 (name_event, bool), (name_time, float)]) 55 y[name_time] = time 56 57 event = np.asanyarray(event) 58 check_consistent_length(time, event) 59 60 if np.issubdtype(event.dtype, np.bool_): 61 y[name_event] = event 62 else: 63 events = np.unique(event) 64 events.sort() 65 if len(events) != 2: 66 raise ValueError("event indicator must be binary") 67 68 if np.all(events == np.array([0, 1], dtype=events.dtype)): 69 y[name_event] = event.astype(bool) 70 else: 71 raise ValueError( 72 "non-boolean event indicator must contain 0 and 1 only") 73 74 return y 75 76 @staticmethod 77 def from_dataframe(event, time, data): 78 """Create structured array from data frame. 79 80 Parameters 81 ---------- 82 event : object 83 Identifier of column containing event indicator. 84 time : object 85 Identifier of column containing time. 86 data : pandas.DataFrame 87 Dataset. 88 89 Returns 90 ------- 91 y : np.array 92 Structured array with two fields. 93 """ 94 if not isinstance(data, pd.DataFrame): 95 raise TypeError( 96 f"expected pandas.DataFrame, but got {type(data)!r}") 97 98 return Surv.from_arrays( 99 data.loc[:, event].values, 100 data.loc[:, time].values, 101 name_event=str(event), 102 name_time=str(time), 103 )
Helper class to construct structured array of event indicator and observed time.