teller
1from .explainer import Comparator 2from .explainer import Explainer 3from .explainer import ConformalExplainer 4from .fdaddiexplainer import FDAdditiveExplainer 5from .integratedgradients import IntegratedGradientsExplainer 6 7from .predictioninterval import PredictionInterval 8 9__all__ = [ 10 "Comparator", 11 "Explainer", 12 "ConformalExplainer", 13 "PredictionInterval", 14 "FDAdditiveExplainer", 15 "IntegratedGradientsExplainer", 16]
21class Comparator(BaseEstimator): 22 """Class Comparator: Compare two models `obj1`, `obj2` ("estimators") based their predictions. 23 24 Attributes: 25 26 obj1: an object; 27 fitted object containing methods `fit` and `predict`. 28 29 obj2: an object; 30 fitted object containing methods `fit` and `predict`. 31 32 """ 33 34 # construct the object ----- 35 def __init__(self, obj1, obj2): 36 37 self.obj1 = obj1 38 self.obj2 = obj2 39 40 # summary of the object ----- 41 def summary(self): 42 """Summarise results of model comparison 43 44 Args: 45 46 None 47 48 """ 49 50 assert (self.obj1.residuals_ is not None) & ( 51 self.obj2.residuals_ is not None 52 ), "provided objects must be fitted first" 53 54 assert ( 55 self.obj1.scoring == self.obj2.scoring 56 ), "scoring metrics must match for both objects" 57 58 print("\n") 59 print(f"Scores ({self.obj1.scoring}): ") 60 print(f"Object1: {np.round(self.obj1.score_, 3)}") 61 print(f"Object2: {np.round(self.obj2.score_, 3)}") 62 63 print("\n") 64 print("R-squared: ") 65 print("Object1: ") 66 print( 67 f"Multiple: {np.round(self.obj1.r_squared_, 3)}, Adjusted: {np.round(self.obj1.adj_r_squared_, 3)}" 68 ) 69 print("Object2: ") 70 print( 71 f"Multiple: {np.round(self.obj2.r_squared_, 3)}, Adjusted: {np.round(self.obj2.adj_r_squared_, 3)}" 72 ) 73 74 print("\n") 75 print("Residuals: ") 76 print("Object1: ") 77 print(self.obj1.residuals_dist_.to_string(index=False)) 78 print("Object2: ") 79 print(self.obj2.residuals_dist_.to_string(index=False)) 80 81 print("\n") 82 print("Paired t-test (H0: mean(resids1) > mean(resids2) at 5%): ") 83 t_test_obj = t_test(self.obj1.residuals_, self.obj2.residuals_) 84 print(f"statistic: {np.round(t_test_obj['statistic'], 5)}") 85 print(f"p.value: {np.round(t_test_obj['p.value'], 5)}") 86 print( 87 f"conf. int: [{t_test_obj['f.int'][0]}, {np.round(t_test_obj['f.int'][1], 5)}]" 88 ) 89 print(f"mean of x: {np.round(t_test_obj['estimate']['mean of x'], 5)}") 90 print(f"mean of y: {np.round(t_test_obj['estimate']['mean of y'], 5)}") 91 print(f"alternative: {t_test_obj['alternative']}") 92 93 if self.obj1.ci_summary_ is not None: 94 95 df1_summary = self.obj1.ci_summary_[ 96 ["Estimate", "Std. Error", ""] 97 ].sort_index(axis=0) 98 df2_summary = self.obj2.ci_summary_[ 99 ["Estimate", "Std. Error", ""] 100 ].sort_index(axis=0) 101 102 df_summary = pd.DataFrame( 103 data=pd.concat([df1_summary, df2_summary], axis=1).values, 104 columns=[ 105 "Estimate1", 106 "Std. Error1", 107 "Signif.", 108 "Estimate2", 109 "Std. Error2", 110 "Signif.", 111 ], 112 index=df1_summary.index, 113 ) 114 115 print("\n") 116 print("Marginal effects: ") 117 print(df_summary) 118 119 print("\n") 120 print("Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘-’ 1")
23class Explainer(BaseEstimator): 24 """Class Explainer: effects of features on the response. 25 26 Attributes: 27 28 obj: an object; 29 fitted object containing methods `fit` and `predict` 30 31 n_jobs: an integer; 32 number of jobs for parallel computing 33 34 y_class: an integer; 35 class whose probability has to be explained (for classification only, default is 0) 36 37 normalize: a boolean; 38 whether the features must be normalized or not (changes the effects) 39 40 """ 41 42 def __init__(self, obj, n_jobs=None, y_class=0, normalize=False): 43 44 self.obj = obj 45 self.n_jobs = n_jobs 46 self.y_mean_ = None 47 self.effects_ = None 48 self.residuals_ = None 49 self.r_squared_ = None 50 self.adj_r_squared_ = None 51 self.effects_ = None 52 self.ci_ = None 53 self.ci_inters_ = {} 54 self.type_fit = None 55 self.y_class = y_class # classification only 56 self.normalize = normalize 57 self.type_ci = None 58 self.col_inters = None 59 60 def fit( 61 self, 62 X, 63 y, 64 X_names=None, 65 method="avg", 66 type_ci="jackknife", 67 scoring=None, 68 level=95, 69 col_inters=None, 70 ): 71 """Fit the explainer's attribute `obj` to training data (X, y). 72 73 Args: 74 75 X: array-like, shape = [n_samples, n_features]; 76 Training vectors, where n_samples is the number 77 of samples and n_features is the number of features. 78 79 y: array-like, shape = [n_samples, ]; Target values. 80 81 X_names: {array-like}, shape = [n_features, ]; 82 Column names (strings) for training vectors (default 83 is None, and not used if X is a data frame). 84 85 method: str; 86 Type of summary requested for effects. Either `avg` 87 (for average effects), `inters` (for interactions) 88 or `ci` (for effects including confidence intervals 89 around them). 90 91 type_ci: str; 92 Type of resampling for `method == 'ci'` (confidence 93 intervals around effects). Either `jackknife` 94 bootsrapping or `gaussian` (gaussian white noise with 95 standard deviation equal to `0.01` applied to the 96 features). 97 98 scoring: str; 99 measure of errors must be in ("explained_variance", 100 "neg_mean_absolute_error", "neg_mean_squared_error", 101 "neg_mean_squared_log_error", "neg_median_absolute_error", 102 "r2", "rmse") (default: "rmse"). 103 104 level: int; Level of confidence required for 105 `method == 'ci'` (in %). 106 107 col_inters: str; Name of column for computing interactions. 108 109 """ 110 assert method in ( 111 "avg", 112 "ci", 113 "inters", 114 ), "must have: `method` in ('avg', 'ci', 'inters')" 115 116 n, p = X.shape 117 118 if isinstance(X, pd.DataFrame): 119 self.X_names = X.columns 120 else: 121 assert ( 122 X_names is not None 123 ), "'X' is a numpy array, 'X_names' must be provided" 124 self.X_names = X_names 125 126 self.level = level 127 self.method = method 128 self.type_ci = type_ci 129 130 if is_factor(y): # classification --- 131 132 self.n_classes = len(np.unique(y)) 133 134 assert ( 135 self.y_class <= self.n_classes 136 ), "self.y_class must be <= number of classes" 137 138 assert hasattr( 139 self.obj, "predict_proba" 140 ), "`self.obj` must be a classifier and have a method `predict_proba`" 141 142 self.type_fit = "classification" 143 144 if scoring is None: 145 self.scoring = "accuracy" 146 147 self.score_ = score_classification( 148 self.obj, X, y, scoring=self.scoring 149 ) 150 151 def predict_proba(x): 152 return self.obj.predict_proba(x)[:, self.y_class] 153 154 y_hat = predict_proba(X) 155 self.residuals_ = y - y_hat 156 157 # heterogeneity of effects 158 if method == "avg": 159 160 self.grad_ = numerical_gradient( 161 predict_proba, 162 X, 163 normalize=self.normalize, 164 n_jobs=self.n_jobs, 165 ) 166 167 # confidence intervals 168 if method == "ci": 169 170 if type_ci == "jackknife": 171 172 self.ci_ = numerical_gradient_jackknife( 173 predict_proba, 174 X, 175 normalize=self.normalize, 176 n_jobs=self.n_jobs, 177 level=level, 178 ) 179 180 if type_ci == "gaussian": 181 182 self.ci_ = numerical_gradient_gaussian( 183 predict_proba, 184 X, 185 normalize=self.normalize, 186 n_jobs=self.n_jobs, 187 level=level, 188 ) 189 190 # interactions 191 if method == "inters": 192 193 raise NotImplementedError 194 195 assert col_inters is not None, "`col_inters` must be provided" 196 197 self.col_inters = col_inters 198 199 ix1 = np.where(np.asarray(X_names) == col_inters)[0][0] 200 201 pbar = Progbar(p) 202 203 if type_ci == "jackknife": 204 205 for ix2 in range(p): 206 207 self.ci_inters_.update( 208 { 209 X_names[ix2]: numerical_interactions_jackknife( 210 f=predict_proba, 211 X=X, 212 ix1=ix1, 213 ix2=ix2, 214 verbose=0, 215 ) 216 } 217 ) 218 219 pbar.update(ix2) 220 221 if type_ci == "gaussian": 222 223 for ix2 in range(p): 224 225 self.ci_inters_.update( 226 { 227 X_names[ix2]: numerical_interactions_gaussian( 228 f=predict_proba, 229 X=X, 230 ix1=ix1, 231 ix2=ix2, 232 verbose=0, 233 ) 234 } 235 ) 236 237 pbar.update(ix2) 238 239 pbar.update(p) 240 print("\n") 241 242 else: # is_factor(y) == False # regression --- 243 244 self.type_fit = "regression" 245 246 if scoring is None: 247 self.scoring = "rmse" 248 249 self.score_ = score_regression(self.obj, X, y, scoring=self.scoring) 250 251 y_hat = self.obj.predict(X) 252 self.residuals_ = y - y_hat 253 254 # heterogeneity of effects 255 if method == "avg": 256 257 self.grad_ = numerical_gradient( 258 self.obj.predict, 259 X, 260 normalize=self.normalize, 261 n_jobs=self.n_jobs, 262 ) 263 264 # confidence intervals 265 if method == "ci": 266 267 if type_ci == "jackknife": 268 269 self.ci_ = numerical_gradient_jackknife( 270 self.obj.predict, 271 X, 272 normalize=self.normalize, 273 n_jobs=self.n_jobs, 274 level=level, 275 ) 276 277 if type_ci == "gaussian": 278 279 self.ci_ = numerical_gradient_gaussian( 280 self.obj.predict, 281 X, 282 normalize=self.normalize, 283 n_jobs=self.n_jobs, 284 level=level, 285 ) 286 287 # interactions 288 if method == "inters": 289 290 raise NotImplementedError 291 292 assert col_inters is not None, "`col_inters` must be provided" 293 294 self.col_inters = col_inters 295 296 ix1 = np.where(np.asarray(X_names) == col_inters)[0][0] 297 298 if self.n_jobs is None: 299 300 pbar = Progbar(p) 301 302 if type_ci == "jackknife": 303 304 for ix2 in range(p): 305 306 self.ci_inters_.update( 307 { 308 X_names[ 309 ix2 310 ]: numerical_interactions_jackknife( 311 f=self.obj.predict, 312 X=X, 313 ix1=ix1, 314 ix2=ix2, 315 verbose=0, 316 ) 317 } 318 ) 319 320 if type_ci == "gaussian": 321 322 for ix2 in range(p): 323 324 self.ci_inters_.update( 325 { 326 X_names[ 327 ix2 328 ]: numerical_interactions_gaussian( 329 f=self.obj.predict, 330 X=X, 331 ix1=ix1, 332 ix2=ix2, 333 verbose=0, 334 ) 335 } 336 ) 337 338 pbar.update(ix2) 339 340 pbar.update(p) 341 print("\n") 342 343 # else self.n_jobs is not None 344 def foo_jackknife(ix): 345 return self.ci_inters_.update( 346 { 347 X_names[ix]: numerical_interactions_jackknife( 348 f=self.obj.predict, 349 X=X, 350 ix1=ix1, 351 ix2=ix, 352 verbose=0, 353 ) 354 } 355 ) 356 357 def foo_gaussian(ix): 358 return self.ci_inters_.update( 359 { 360 X_names[ix]: numerical_interactions_gaussian( 361 f=self.obj.predict, 362 X=X, 363 ix1=ix1, 364 ix2=ix, 365 verbose=0, 366 ) 367 } 368 ) 369 370 if type_ci == "jackknife": 371 res = Parallel(n_jobs=self.n_jobs)( 372 delayed(foo_jackknife)(ix2) for ix2 in tqdm(range(p)) 373 ) 374 375 if type_ci == "gaussian": 376 res = Parallel(n_jobs=self.n_jobs)( 377 delayed(foo_gaussian)(ix2) for ix2 in tqdm(range(p)) 378 ) 379 380 self.y_mean_ = np.mean(y) 381 ss_tot = np.sum((y - self.y_mean_) ** 2) 382 ss_reg = np.sum((y_hat - self.y_mean_) ** 2) 383 ss_res = np.sum((y - y_hat) ** 2) 384 385 self.r_squared_ = 1 - ss_res / ss_tot 386 self.adj_r_squared_ = 1 - (1 - self.r_squared_) * (n - 1) / ( 387 n - p - 1 388 ) 389 390 # classification and regression --- 391 392 if method == "avg": 393 394 res_df = pd.DataFrame(data=self.grad_, columns=X_names) 395 396 res_df_mean = res_df.mean() 397 res_df_std = res_df.std() 398 res_df_median = res_df.median() 399 res_df_min = res_df.min() 400 res_df_max = res_df.max() 401 data = pd.concat( 402 [ 403 res_df_mean, 404 res_df_std, 405 res_df_median, 406 res_df_min, 407 res_df_max, 408 ], 409 axis=1, 410 ) 411 412 df_effects = pd.DataFrame( 413 data=data.values, 414 columns=["mean", "std", "median", "min", "max"], 415 index=X_names, 416 ) 417 418 # heterogeneity of effects 419 self.effects_ = df_effects.sort_values(by=["mean"], ascending=False) 420 421 return self 422 423 def summary(self): 424 """Summarise results 425 426 a method in class Explainer 427 428 Args: 429 430 None 431 432 """ 433 434 assert ( 435 (self.ci_ is not None) 436 | (self.effects_ is not None) 437 | (self.ci_inters_ is not None) 438 ), "object not fitted, fit the object first" 439 440 if (self.ci_ is not None) & (self.method == "ci"): 441 442 df_mean = pd.Series(data=self.ci_[0], index=self.X_names) 443 df_se = pd.Series(data=self.ci_[1], index=self.X_names) 444 df_ubound = pd.Series(data=self.ci_[2], index=self.X_names) 445 df_lbound = pd.Series(data=self.ci_[3], index=self.X_names) 446 df_pvalue = pd.Series(data=self.ci_[4], index=self.X_names) 447 df_signif = pd.Series(data=self.ci_[5], index=self.X_names) 448 449 data = pd.concat( 450 [df_mean, df_se, df_lbound, df_ubound, df_pvalue, df_signif], 451 axis=1, 452 ) 453 454 self.ci_summary_ = pd.DataFrame( 455 data=data.values, 456 columns=[ 457 "Estimate", 458 "Std. Error", 459 str(self.level) + "% lbound", 460 str(self.level) + "% ubound", 461 "Pr(>|t|)", 462 "", 463 ], 464 index=self.X_names, 465 ).sort_values(by=["Estimate"], ascending=False) 466 467 print("\n") 468 print(f"Score ({self.scoring}): \n {np.round(self.score_, 3)}") 469 470 if self.type_fit == "regression": 471 472 print("\n") 473 print("Residuals: ") 474 self.residuals_dist_ = pd.DataFrame( 475 pd.Series( 476 data=np.quantile( 477 self.residuals_, q=[0, 0.25, 0.5, 0.75, 1] 478 ), 479 index=["Min", "1Q", "Median", "3Q", "Max"], 480 ) 481 ).transpose() 482 483 print(self.residuals_dist_.to_string(index=False)) 484 485 print("\n") 486 if self.type_ci == "jackknife": 487 print("Tests on marginal effects (Jackknife): ") 488 if self.type_ci == "gaussian": 489 print("Tests on marginal effects (Gaussian noise): ") 490 with pd.option_context( 491 "display.max_rows", None, "display.max_columns", None 492 ): 493 print(self.ci_summary_) 494 print("\n") 495 print( 496 "Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘-’ 1" 497 ) 498 499 if self.type_fit == "regression": 500 print("\n") 501 print( 502 f"Multiple R-squared: {np.round(self.r_squared_, 3)}, Adjusted R-squared: {np.round(self.adj_r_squared_, 3)}" 503 ) 504 505 if (self.effects_ is not None) & (self.method == "avg"): 506 print("\n") 507 print("Heterogeneity of marginal effects: ") 508 with pd.option_context( 509 "display.max_rows", None, "display.max_columns", None 510 ): 511 print(self.effects_) 512 print("\n") 513 514 if (self.ci_inters_ is not None) & (self.method == "inters"): 515 # raise NotImplementedError("This one's a bit tricky, I'm working on it") 516 print(f"self.col_inters: {self.col_inters}") 517 print("\n") 518 print("Interactions with " + self.col_inters + ": ") 519 with pd.option_context( 520 "display.max_rows", None, "display.max_columns", None 521 ): 522 print( 523 pd.DataFrame( 524 self.ci_inters_, 525 index=[ 526 "Estimate", 527 "Std. Error", 528 str(95) + "% lbound", 529 str(95) + "% ubound", 530 "Pr(>|t|)", 531 "", 532 ], 533 ).transpose() 534 ) 535 536 # return 537 538 def plot(self, what): 539 """Plot average effects, heterogeneity of effects, ... 540 541 Args: 542 543 what: a string; 544 if . 545 """ 546 assert self.effects_ is not None, "Call method 'fit' before plotting" 547 assert self.grad_ is not None, "Call method 'fit' before plotting" 548 549 effects = self.effects_[self.effects_["mean"] != 0] 550 551 # For method == "avg" 552 if self.method == "avg": 553 554 if what == "average_effects": 555 sns.set(style="darkgrid") 556 fi = pd.DataFrame() 557 fi["features"] = effects.index.values 558 fi["effect"] = effects["mean"].values 559 sns.barplot( 560 x="effect", 561 y="features", 562 data=fi.sort_values(by="effect", ascending=False), 563 ) 564 565 if what == "hetero_effects": 566 grads_df = pd.DataFrame(data=self.grad_, columns=self.X_names) 567 sorted_columns = list(effects.index.values) # by mean 568 sorted_columns.reverse() 569 grads_df = grads_df.reindex(sorted_columns, axis=1) 570 sns.set_theme(style="darkgrid") 571 grads_df.boxplot(vert=False) 572 573 # For method == "ci" 574 if self.method == "ci": 575 assert self.ci_ is not None, "Call method 'fit' before plotting" 576 raise NotImplementedError("No plot for method == 'ci' yet") 577 578 def get_individual_effects(self): 579 assert ( 580 self.grad_ is not None 581 ), "Call method 'fit' before calling this method" 582 if self.method == "avg": 583 return pd.DataFrame(data=self.grad_, columns=self.X_names)
Class Explainer: effects of features on the response.
Attributes:
obj: an object;
fitted object containing methods `fit` and `predict`
n_jobs: an integer;
number of jobs for parallel computing
y_class: an integer;
class whose probability has to be explained (for classification only, default is 0)
normalize: a boolean;
whether the features must be normalized or not (changes the effects)
60 def fit( 61 self, 62 X, 63 y, 64 X_names=None, 65 method="avg", 66 type_ci="jackknife", 67 scoring=None, 68 level=95, 69 col_inters=None, 70 ): 71 """Fit the explainer's attribute `obj` to training data (X, y). 72 73 Args: 74 75 X: array-like, shape = [n_samples, n_features]; 76 Training vectors, where n_samples is the number 77 of samples and n_features is the number of features. 78 79 y: array-like, shape = [n_samples, ]; Target values. 80 81 X_names: {array-like}, shape = [n_features, ]; 82 Column names (strings) for training vectors (default 83 is None, and not used if X is a data frame). 84 85 method: str; 86 Type of summary requested for effects. Either `avg` 87 (for average effects), `inters` (for interactions) 88 or `ci` (for effects including confidence intervals 89 around them). 90 91 type_ci: str; 92 Type of resampling for `method == 'ci'` (confidence 93 intervals around effects). Either `jackknife` 94 bootsrapping or `gaussian` (gaussian white noise with 95 standard deviation equal to `0.01` applied to the 96 features). 97 98 scoring: str; 99 measure of errors must be in ("explained_variance", 100 "neg_mean_absolute_error", "neg_mean_squared_error", 101 "neg_mean_squared_log_error", "neg_median_absolute_error", 102 "r2", "rmse") (default: "rmse"). 103 104 level: int; Level of confidence required for 105 `method == 'ci'` (in %). 106 107 col_inters: str; Name of column for computing interactions. 108 109 """ 110 assert method in ( 111 "avg", 112 "ci", 113 "inters", 114 ), "must have: `method` in ('avg', 'ci', 'inters')" 115 116 n, p = X.shape 117 118 if isinstance(X, pd.DataFrame): 119 self.X_names = X.columns 120 else: 121 assert ( 122 X_names is not None 123 ), "'X' is a numpy array, 'X_names' must be provided" 124 self.X_names = X_names 125 126 self.level = level 127 self.method = method 128 self.type_ci = type_ci 129 130 if is_factor(y): # classification --- 131 132 self.n_classes = len(np.unique(y)) 133 134 assert ( 135 self.y_class <= self.n_classes 136 ), "self.y_class must be <= number of classes" 137 138 assert hasattr( 139 self.obj, "predict_proba" 140 ), "`self.obj` must be a classifier and have a method `predict_proba`" 141 142 self.type_fit = "classification" 143 144 if scoring is None: 145 self.scoring = "accuracy" 146 147 self.score_ = score_classification( 148 self.obj, X, y, scoring=self.scoring 149 ) 150 151 def predict_proba(x): 152 return self.obj.predict_proba(x)[:, self.y_class] 153 154 y_hat = predict_proba(X) 155 self.residuals_ = y - y_hat 156 157 # heterogeneity of effects 158 if method == "avg": 159 160 self.grad_ = numerical_gradient( 161 predict_proba, 162 X, 163 normalize=self.normalize, 164 n_jobs=self.n_jobs, 165 ) 166 167 # confidence intervals 168 if method == "ci": 169 170 if type_ci == "jackknife": 171 172 self.ci_ = numerical_gradient_jackknife( 173 predict_proba, 174 X, 175 normalize=self.normalize, 176 n_jobs=self.n_jobs, 177 level=level, 178 ) 179 180 if type_ci == "gaussian": 181 182 self.ci_ = numerical_gradient_gaussian( 183 predict_proba, 184 X, 185 normalize=self.normalize, 186 n_jobs=self.n_jobs, 187 level=level, 188 ) 189 190 # interactions 191 if method == "inters": 192 193 raise NotImplementedError 194 195 assert col_inters is not None, "`col_inters` must be provided" 196 197 self.col_inters = col_inters 198 199 ix1 = np.where(np.asarray(X_names) == col_inters)[0][0] 200 201 pbar = Progbar(p) 202 203 if type_ci == "jackknife": 204 205 for ix2 in range(p): 206 207 self.ci_inters_.update( 208 { 209 X_names[ix2]: numerical_interactions_jackknife( 210 f=predict_proba, 211 X=X, 212 ix1=ix1, 213 ix2=ix2, 214 verbose=0, 215 ) 216 } 217 ) 218 219 pbar.update(ix2) 220 221 if type_ci == "gaussian": 222 223 for ix2 in range(p): 224 225 self.ci_inters_.update( 226 { 227 X_names[ix2]: numerical_interactions_gaussian( 228 f=predict_proba, 229 X=X, 230 ix1=ix1, 231 ix2=ix2, 232 verbose=0, 233 ) 234 } 235 ) 236 237 pbar.update(ix2) 238 239 pbar.update(p) 240 print("\n") 241 242 else: # is_factor(y) == False # regression --- 243 244 self.type_fit = "regression" 245 246 if scoring is None: 247 self.scoring = "rmse" 248 249 self.score_ = score_regression(self.obj, X, y, scoring=self.scoring) 250 251 y_hat = self.obj.predict(X) 252 self.residuals_ = y - y_hat 253 254 # heterogeneity of effects 255 if method == "avg": 256 257 self.grad_ = numerical_gradient( 258 self.obj.predict, 259 X, 260 normalize=self.normalize, 261 n_jobs=self.n_jobs, 262 ) 263 264 # confidence intervals 265 if method == "ci": 266 267 if type_ci == "jackknife": 268 269 self.ci_ = numerical_gradient_jackknife( 270 self.obj.predict, 271 X, 272 normalize=self.normalize, 273 n_jobs=self.n_jobs, 274 level=level, 275 ) 276 277 if type_ci == "gaussian": 278 279 self.ci_ = numerical_gradient_gaussian( 280 self.obj.predict, 281 X, 282 normalize=self.normalize, 283 n_jobs=self.n_jobs, 284 level=level, 285 ) 286 287 # interactions 288 if method == "inters": 289 290 raise NotImplementedError 291 292 assert col_inters is not None, "`col_inters` must be provided" 293 294 self.col_inters = col_inters 295 296 ix1 = np.where(np.asarray(X_names) == col_inters)[0][0] 297 298 if self.n_jobs is None: 299 300 pbar = Progbar(p) 301 302 if type_ci == "jackknife": 303 304 for ix2 in range(p): 305 306 self.ci_inters_.update( 307 { 308 X_names[ 309 ix2 310 ]: numerical_interactions_jackknife( 311 f=self.obj.predict, 312 X=X, 313 ix1=ix1, 314 ix2=ix2, 315 verbose=0, 316 ) 317 } 318 ) 319 320 if type_ci == "gaussian": 321 322 for ix2 in range(p): 323 324 self.ci_inters_.update( 325 { 326 X_names[ 327 ix2 328 ]: numerical_interactions_gaussian( 329 f=self.obj.predict, 330 X=X, 331 ix1=ix1, 332 ix2=ix2, 333 verbose=0, 334 ) 335 } 336 ) 337 338 pbar.update(ix2) 339 340 pbar.update(p) 341 print("\n") 342 343 # else self.n_jobs is not None 344 def foo_jackknife(ix): 345 return self.ci_inters_.update( 346 { 347 X_names[ix]: numerical_interactions_jackknife( 348 f=self.obj.predict, 349 X=X, 350 ix1=ix1, 351 ix2=ix, 352 verbose=0, 353 ) 354 } 355 ) 356 357 def foo_gaussian(ix): 358 return self.ci_inters_.update( 359 { 360 X_names[ix]: numerical_interactions_gaussian( 361 f=self.obj.predict, 362 X=X, 363 ix1=ix1, 364 ix2=ix, 365 verbose=0, 366 ) 367 } 368 ) 369 370 if type_ci == "jackknife": 371 res = Parallel(n_jobs=self.n_jobs)( 372 delayed(foo_jackknife)(ix2) for ix2 in tqdm(range(p)) 373 ) 374 375 if type_ci == "gaussian": 376 res = Parallel(n_jobs=self.n_jobs)( 377 delayed(foo_gaussian)(ix2) for ix2 in tqdm(range(p)) 378 ) 379 380 self.y_mean_ = np.mean(y) 381 ss_tot = np.sum((y - self.y_mean_) ** 2) 382 ss_reg = np.sum((y_hat - self.y_mean_) ** 2) 383 ss_res = np.sum((y - y_hat) ** 2) 384 385 self.r_squared_ = 1 - ss_res / ss_tot 386 self.adj_r_squared_ = 1 - (1 - self.r_squared_) * (n - 1) / ( 387 n - p - 1 388 ) 389 390 # classification and regression --- 391 392 if method == "avg": 393 394 res_df = pd.DataFrame(data=self.grad_, columns=X_names) 395 396 res_df_mean = res_df.mean() 397 res_df_std = res_df.std() 398 res_df_median = res_df.median() 399 res_df_min = res_df.min() 400 res_df_max = res_df.max() 401 data = pd.concat( 402 [ 403 res_df_mean, 404 res_df_std, 405 res_df_median, 406 res_df_min, 407 res_df_max, 408 ], 409 axis=1, 410 ) 411 412 df_effects = pd.DataFrame( 413 data=data.values, 414 columns=["mean", "std", "median", "min", "max"], 415 index=X_names, 416 ) 417 418 # heterogeneity of effects 419 self.effects_ = df_effects.sort_values(by=["mean"], ascending=False) 420 421 return self
Fit the explainer's attribute obj to training data (X, y).
Args:
X: array-like, shape = [n_samples, n_features];
Training vectors, where n_samples is the number
of samples and n_features is the number of features.
y: array-like, shape = [n_samples, ]; Target values.
X_names: {array-like}, shape = [n_features, ];
Column names (strings) for training vectors (default
is None, and not used if X is a data frame).
method: str;
Type of summary requested for effects. Either `avg`
(for average effects), `inters` (for interactions)
or `ci` (for effects including confidence intervals
around them).
type_ci: str;
Type of resampling for `method == 'ci'` (confidence
intervals around effects). Either `jackknife`
bootsrapping or `gaussian` (gaussian white noise with
standard deviation equal to `0.01` applied to the
features).
scoring: str;
measure of errors must be in ("explained_variance",
"neg_mean_absolute_error", "neg_mean_squared_error",
"neg_mean_squared_log_error", "neg_median_absolute_error",
"r2", "rmse") (default: "rmse").
level: int; Level of confidence required for
`method == 'ci'` (in %).
col_inters: str; Name of column for computing interactions.
23class ConformalExplainer(BaseEstimator): 24 """Class ConformalExplainer: effects of features on the response. 25 26 Attributes: 27 28 obj: an object; 29 fitted object containing methods `fit` and `predict` 30 31 n_jobs: an integer; 32 number of jobs for parallel computing 33 34 y_class: an integer; 35 class whose probability has to be explained (for classification only, default is 0) 36 37 """ 38 39 def __init__(self, obj, n_jobs=None, y_class=0): 40 self.obj = obj 41 self.n_jobs = n_jobs 42 self.y_class = y_class 43 self.summary_ = None 44 self.col_inters = None 45 46 def fit( 47 self, 48 X, 49 X_names=None, 50 level=95, 51 col_inters=None, 52 ): 53 """Fit the explainer's attribute `obj` to training data (X, y). 54 55 Args: 56 X: array-like, shape = [n_samples, n_features]; 57 Training vectors, where n_samples is the number 58 of samples and n_features is the number of features. 59 60 X_names: {array-like}, shape = [n_features, ]; 61 Column names (strings) for training vectors (default 62 is None, and not used if X is a data frame). 63 64 level: confidence level for intervals (default: 95) 65 66 col_inters: str; Name of column for computing interactions. 67 """ 68 n, p = X.shape 69 70 self.col_inters = col_inters 71 72 if isinstance(X, pd.DataFrame): 73 self.X_names = X.columns 74 else: 75 assert ( 76 X_names is not None 77 ), "'X' is a numpy array, 'X_names' must be provided" 78 self.X_names = X_names 79 80 # Check if estimator is a classifier using hasattr 81 is_classifier = hasattr(self.obj, "predict_proba") 82 83 if is_classifier: # classification --- 84 self.type_fit = "classification" 85 self.summary_ = sensitivity_confidence_intervals( 86 model=self.obj, X_test=X, confidence_level=level / 100 87 ) 88 89 else: # regression --- 90 self.type_fit = "regression" 91 self.summary_ = sensitivity_confidence_intervals( 92 model=self.obj, X_test=X, confidence_level=level / 100 93 ) 94 95 # interactions 96 if self.col_inters is not None: 97 raise NotImplementedError 98 99 return self 100 101 def summary(self): 102 """Summarise results in a user-friendly format. 103 104 Args: 105 None 106 107 Returns: 108 Formatted summary of the sensitivity analysis results 109 """ 110 if self.summary_ is None: 111 return "Model has not been fitted yet. Call fit() first." 112 113 # Create DataFrame with results 114 results_df = pd.DataFrame( 115 { 116 "Mean Estimate": self.summary_.mean, 117 "Median Estimate": self.summary_.mean, 118 ".95 Lower Bound": self.summary_.lower, 119 ".95 Upper Bound": self.summary_.upper, 120 "Signif.": self.summary_.signif_codes, 121 "PI length": self.summary_.pi_length, 122 }, 123 index=self.X_names, 124 ) 125 126 # Sort by absolute value of estimate 127 results_df = results_df.reindex( 128 results_df["Mean Estimate"].sort_values(ascending=False).index 129 ) 130 131 # Format the output 132 print("\nSensitivity Analysis Results:") 133 print("============================") 134 print(f"\nModel type: {self.type_fit}") 135 print("\nFeature Effects:") 136 print(results_df.round(4)) 137 138 # Don't return the DataFrame to avoid duplication 139 # return 140 141 def plot(self): 142 """Plot confidence intervals for each feature. 143 144 Args: 145 None 146 """ 147 assert self.summary_ is not None, "Call method 'fit' before plotting" 148 149 # Create DataFrame with results 150 results_df = pd.DataFrame( 151 { 152 "Mean Estimate": self.summary_.mean, 153 ".95 Lower Bound": self.summary_.lower, 154 ".95 Upper Bound": self.summary_.upper, 155 }, 156 index=self.X_names, 157 ) 158 159 # Filter features with non-zero mean estimates 160 results_df = results_df[results_df["Mean Estimate"] != 0] 161 162 # Sort by mean estimate 163 results_df = results_df.sort_values(by="Mean Estimate", ascending=False) 164 165 # Plot confidence intervals 166 sns.set_theme(style="darkgrid") 167 plt.figure(figsize=(10, 6)) 168 plt.errorbar( 169 x=results_df["Mean Estimate"], 170 y=results_df.index, 171 xerr=[ 172 results_df["Mean Estimate"] - results_df[".95 Lower Bound"], 173 results_df[".95 Upper Bound"] - results_df["Mean Estimate"], 174 ], 175 fmt="o", 176 capsize=5, 177 color="blue", 178 ecolor="gray", 179 ) 180 plt.axvline(x=0, color="red", linestyle="--", linewidth=1) 181 plt.title("Confidence Intervals for Feature Effects") 182 plt.xlabel("Mean Estimate") 183 plt.ylabel("Features") 184 plt.tight_layout() 185 plt.show()
Class ConformalExplainer: effects of features on the response.
Attributes:
obj: an object;
fitted object containing methods `fit` and `predict`
n_jobs: an integer;
number of jobs for parallel computing
y_class: an integer;
class whose probability has to be explained (for classification only, default is 0)
46 def fit( 47 self, 48 X, 49 X_names=None, 50 level=95, 51 col_inters=None, 52 ): 53 """Fit the explainer's attribute `obj` to training data (X, y). 54 55 Args: 56 X: array-like, shape = [n_samples, n_features]; 57 Training vectors, where n_samples is the number 58 of samples and n_features is the number of features. 59 60 X_names: {array-like}, shape = [n_features, ]; 61 Column names (strings) for training vectors (default 62 is None, and not used if X is a data frame). 63 64 level: confidence level for intervals (default: 95) 65 66 col_inters: str; Name of column for computing interactions. 67 """ 68 n, p = X.shape 69 70 self.col_inters = col_inters 71 72 if isinstance(X, pd.DataFrame): 73 self.X_names = X.columns 74 else: 75 assert ( 76 X_names is not None 77 ), "'X' is a numpy array, 'X_names' must be provided" 78 self.X_names = X_names 79 80 # Check if estimator is a classifier using hasattr 81 is_classifier = hasattr(self.obj, "predict_proba") 82 83 if is_classifier: # classification --- 84 self.type_fit = "classification" 85 self.summary_ = sensitivity_confidence_intervals( 86 model=self.obj, X_test=X, confidence_level=level / 100 87 ) 88 89 else: # regression --- 90 self.type_fit = "regression" 91 self.summary_ = sensitivity_confidence_intervals( 92 model=self.obj, X_test=X, confidence_level=level / 100 93 ) 94 95 # interactions 96 if self.col_inters is not None: 97 raise NotImplementedError 98 99 return self
Fit the explainer's attribute obj to training data (X, y).
Args: X: array-like, shape = [n_samples, n_features]; Training vectors, where n_samples is the number of samples and n_features is the number of features.
X_names: {array-like}, shape = [n_features, ];
Column names (strings) for training vectors (default
is None, and not used if X is a data frame).
level: confidence level for intervals (default: 95)
col_inters: str; Name of column for computing interactions.
14class PredictionInterval(BaseEstimator, RegressorMixin): 15 """Class PredictionInterval: Obtain prediction intervals. 16 17 Attributes: 18 19 obj: an object; 20 fitted object containing methods `fit` and `predict` 21 22 method: a string; 23 method for constructing the prediction intervals. 24 Currently "splitconformal" (default) and "localconformal" 25 26 level: a float; 27 Confidence level for prediction intervals. Default is 0.95, 28 equivalent to a miscoverage error of 0.05 29 30 seed: an integer; 31 Reproducibility of fit (there's a random split between fitting and calibration data) 32 """ 33 34 def __init__(self, obj, method="splitconformal", level=0.95, seed=123): 35 if not isinstance(level, (int, float)) or not 0 < level < 1: 36 raise ValueError("level must be a float between 0 and 1") 37 if method not in ["splitconformal", "localconformal"]: 38 raise ValueError( 39 "method must be either 'splitconformal' or 'localconformal'" 40 ) 41 42 self.obj = obj 43 self.method = method 44 self.level = level 45 self.seed = seed 46 self.quantile_ = 0.0 # Initialize with a default value 47 self.icp_ = None 48 49 def fit(self, X, y): 50 """Fit the `method` to training data (X, y). 51 52 Args: 53 54 X: array-like, shape = [n_samples, n_features]; 55 Training set vectors, where n_samples is the number 56 of samples and n_features is the number of features. 57 58 y: array-like, shape = [n_samples, ]; Target values. 59 60 """ 61 if len(X) < 2: 62 raise ValueError( 63 "Need at least 2 samples for train/calibration split" 64 ) 65 66 X_train, X_calibration, y_train, y_calibration = train_test_split( 67 X, y, test_size=0.5, random_state=self.seed 68 ) 69 70 if self.method == "splitconformal": 71 n_samples_calibration = X_calibration.shape[0] 72 q = self.level * (1 + 1 / n_samples_calibration) 73 74 # Fit the base model 75 self.obj.fit(X_train, y_train) 76 77 # Get predictions on calibration set 78 preds_calibration = self.obj.predict(X_calibration) 79 80 # Calculate absolute residuals 81 absolute_residuals = np.abs(y_calibration - preds_calibration) 82 83 # Calculate quantile 84 try: 85 # numpy version >= 1.22 86 self.quantile_ = np.quantile( 87 a=absolute_residuals, q=q, method="higher" 88 ) 89 except: 90 # numpy version < 1.22 91 self.quantile_ = np.quantile( 92 a=absolute_residuals, q=q, interpolation="higher" 93 ) 94 95 if self.quantile_ is None or np.isnan(self.quantile_): 96 raise ValueError("Failed to calculate valid quantile") 97 98 elif self.method == "localconformal": 99 mad_estimator = RandomForestRegressor() 100 normalizer = RegressorNormalizer( 101 self.obj, mad_estimator, AbsErrorErrFunc() 102 ) 103 nc = RegressorNc(self.obj, AbsErrorErrFunc(), normalizer) 104 105 self.icp_ = IcpRegressor(nc) 106 self.icp_.fit(X_train, y_train) 107 self.icp_.calibrate(X_calibration, y_calibration) 108 109 return self 110 111 def predict(self, X, return_pi=False): 112 """Obtain predictions and prediction intervals 113 114 Args: 115 116 X: array-like, shape = [n_samples, n_features]; 117 Testing set vectors, where n_samples is the number 118 of samples and n_features is the number of features. 119 120 return_pi: boolean 121 Whether the prediction interval is returned or not. 122 Default is False, for compatibility with other _estimators_. 123 If True, a tuple containing the predictions + lower and upper 124 bounds is returned. 125 126 """ 127 if not hasattr(self.obj, "predict"): 128 raise ValueError("Base estimator must have a predict method") 129 130 pred = self.obj.predict(X) 131 132 if self.method == "splitconformal": 133 if self.quantile_ is None: 134 raise ValueError( 135 "Model not fitted or quantile calculation failed" 136 ) 137 138 if return_pi: 139 return pred, (pred - self.quantile_), (pred + self.quantile_) 140 return pred 141 142 elif self.method == "localconformal": 143 if self.icp_ is None: 144 raise ValueError("Model not fitted or ICP not initialized") 145 146 if return_pi: 147 predictions_bounds = self.icp_.predict( 148 X, significance=1 - self.level 149 ) 150 return pred, predictions_bounds[:, 0], predictions_bounds[:, 1] 151 return pred
Class PredictionInterval: Obtain prediction intervals.
Attributes:
obj: an object;
fitted object containing methods `fit` and `predict`
method: a string;
method for constructing the prediction intervals.
Currently "splitconformal" (default) and "localconformal"
level: a float;
Confidence level for prediction intervals. Default is 0.95,
equivalent to a miscoverage error of 0.05
seed: an integer;
Reproducibility of fit (there's a random split between fitting and calibration data)
49 def fit(self, X, y): 50 """Fit the `method` to training data (X, y). 51 52 Args: 53 54 X: array-like, shape = [n_samples, n_features]; 55 Training set vectors, where n_samples is the number 56 of samples and n_features is the number of features. 57 58 y: array-like, shape = [n_samples, ]; Target values. 59 60 """ 61 if len(X) < 2: 62 raise ValueError( 63 "Need at least 2 samples for train/calibration split" 64 ) 65 66 X_train, X_calibration, y_train, y_calibration = train_test_split( 67 X, y, test_size=0.5, random_state=self.seed 68 ) 69 70 if self.method == "splitconformal": 71 n_samples_calibration = X_calibration.shape[0] 72 q = self.level * (1 + 1 / n_samples_calibration) 73 74 # Fit the base model 75 self.obj.fit(X_train, y_train) 76 77 # Get predictions on calibration set 78 preds_calibration = self.obj.predict(X_calibration) 79 80 # Calculate absolute residuals 81 absolute_residuals = np.abs(y_calibration - preds_calibration) 82 83 # Calculate quantile 84 try: 85 # numpy version >= 1.22 86 self.quantile_ = np.quantile( 87 a=absolute_residuals, q=q, method="higher" 88 ) 89 except: 90 # numpy version < 1.22 91 self.quantile_ = np.quantile( 92 a=absolute_residuals, q=q, interpolation="higher" 93 ) 94 95 if self.quantile_ is None or np.isnan(self.quantile_): 96 raise ValueError("Failed to calculate valid quantile") 97 98 elif self.method == "localconformal": 99 mad_estimator = RandomForestRegressor() 100 normalizer = RegressorNormalizer( 101 self.obj, mad_estimator, AbsErrorErrFunc() 102 ) 103 nc = RegressorNc(self.obj, AbsErrorErrFunc(), normalizer) 104 105 self.icp_ = IcpRegressor(nc) 106 self.icp_.fit(X_train, y_train) 107 self.icp_.calibrate(X_calibration, y_calibration) 108 109 return self
Fit the method to training data (X, y).
Args:
X: array-like, shape = [n_samples, n_features];
Training set vectors, where n_samples is the number
of samples and n_features is the number of features.
y: array-like, shape = [n_samples, ]; Target values.
111 def predict(self, X, return_pi=False): 112 """Obtain predictions and prediction intervals 113 114 Args: 115 116 X: array-like, shape = [n_samples, n_features]; 117 Testing set vectors, where n_samples is the number 118 of samples and n_features is the number of features. 119 120 return_pi: boolean 121 Whether the prediction interval is returned or not. 122 Default is False, for compatibility with other _estimators_. 123 If True, a tuple containing the predictions + lower and upper 124 bounds is returned. 125 126 """ 127 if not hasattr(self.obj, "predict"): 128 raise ValueError("Base estimator must have a predict method") 129 130 pred = self.obj.predict(X) 131 132 if self.method == "splitconformal": 133 if self.quantile_ is None: 134 raise ValueError( 135 "Model not fitted or quantile calculation failed" 136 ) 137 138 if return_pi: 139 return pred, (pred - self.quantile_), (pred + self.quantile_) 140 return pred 141 142 elif self.method == "localconformal": 143 if self.icp_ is None: 144 raise ValueError("Model not fitted or ICP not initialized") 145 146 if return_pi: 147 predictions_bounds = self.icp_.predict( 148 X, significance=1 - self.level 149 ) 150 return pred, predictions_bounds[:, 0], predictions_bounds[:, 1] 151 return pred
Obtain predictions and prediction intervals
Args:
X: array-like, shape = [n_samples, n_features];
Testing set vectors, where n_samples is the number
of samples and n_features is the number of features.
return_pi: boolean
Whether the prediction interval is returned or not.
Default is False, for compatibility with other _estimators_.
If True, a tuple containing the predictions + lower and upper
bounds is returned.
12class FDAdditiveExplainer(BaseEstimator): 13 """ 14 Finite Difference SHAP-like Explainer with 1st/2nd order effects and numerical safeguards. 15 16 Features: 17 - Adaptive step sizes with numerical stability checks 18 - On-demand interaction computation 19 - Parallel/sequential execution modes 20 - Progress tracking and visualization 21 22 Parameters: 23 prediction_function (Callable): Model's prediction function 24 order (int): Differentiation order (1 or 2, default=1) 25 zero (float): Threshold for small values (default=machine epsilon) 26 n_jobs (int): Parallel jobs (-1=all cores, 0=sequential, 1+=specific count) 27 normalize (bool): Normalize attributions (default=False) 28 progress_bar (bool/str): Progress bar style (True/False/'notebook'/'terminal') 29 """ 30 31 def __init__( 32 self, 33 prediction_function: Callable, 34 order: int = 1, 35 zero: float = None, 36 n_jobs: int = -1, 37 normalize: bool = False, 38 progress_bar: Union[bool, str] = True, 39 ): 40 self.prediction_function = prediction_function 41 self.order = min(max(order, 1), 2) # Clamp to 1-2 range 42 self.zero = zero if zero is not None else np.finfo(float).eps 43 self.n_jobs = n_jobs 44 self.normalize = normalize 45 self.progress_bar = progress_bar 46 self._hessian = None 47 self._interactions = None 48 self.MIN_STEP_SIZE = 1e-10 49 self.MAX_CONDITION = 1e16 50 51 def _get_progress_bar(self, iterable, desc=None): 52 """Configured progress bar factory""" 53 if not self.progress_bar: 54 return iterable 55 return tqdm(iterable, desc=desc, disable=not self.progress_bar) 56 57 def _safe_divide(self, numerator, denominator): 58 """Numerically stable division with bounds checking""" 59 denominator = np.where( 60 np.abs(denominator) < self.MIN_STEP_SIZE, 61 np.sign(denominator) * self.MIN_STEP_SIZE, 62 denominator, 63 ) 64 ratio = numerator / denominator 65 return np.where( 66 np.abs(ratio) > self.MAX_CONDITION, 67 np.sign(ratio) * self.MAX_CONDITION, 68 ratio, 69 ) 70 71 def _safe_step_size(self, value_x, cond, eps_factor): 72 """Compute step size with multiple safeguards""" 73 h = eps_factor * value_x * cond + self.zero * (~cond) 74 return np.where( 75 np.abs(h) < self.MIN_STEP_SIZE, np.sign(h) * self.MIN_STEP_SIZE, h 76 ) 77 78 def _compute_first_order(self, X: np.ndarray) -> np.ndarray: 79 """Numerically robust 1st-order gradients""" 80 n_samples, p = X.shape 81 grad = np.zeros_like(X) 82 eps_factor = self.zero ** (1 / 3) 83 84 for ix in self._get_progress_bar(range(p), desc="1st-order terms"): 85 value_x = X[:, ix].copy() 86 cond = np.abs(value_x) > self.zero 87 h = self._safe_step_size(value_x, cond, eps_factor) 88 89 X[:, ix] = value_x + h 90 fx_plus = np.array(self.prediction_function(X)) 91 X[:, ix] = value_x - h 92 fx_minus = np.array(self.prediction_function(X)) 93 X[:, ix] = value_x # Restore 94 95 grad[:, ix] = self._safe_divide(fx_plus - fx_minus, 2 * h) 96 97 return grad 98 99 def calculate_interactions(self, X: np.ndarray) -> None: 100 """ 101 Pre-compute interaction effects for later use. 102 103 Args: 104 X: Input data (n_samples, n_features) 105 """ 106 if self.order < 2: 107 raise ValueError("Interactions require order=2 initialization") 108 109 X = np.array(X) 110 self.f0 = self.prediction_function(X) 111 self._hessian = self._compute_second_order(X) 112 self._interactions = ( 113 np.einsum( 114 "sij,si,sj->s", 115 self._hessian, 116 X - np.median(X, axis=0), 117 X - np.median(X, axis=0), 118 ) 119 / 2 120 ) 121 122 def _compute_second_order(self, X: np.ndarray) -> np.ndarray: 123 """Numerically stable 2nd-order computation""" 124 n_samples, p = X.shape 125 hessian = np.zeros((n_samples, p, p)) 126 eps_factor = self.zero ** (1 / 4) 127 128 # Diagonal terms (5-point stencil) 129 for ix in self._get_progress_bar(range(p), desc="2nd-order diagonal"): 130 value_x = X[:, ix].copy() 131 cond = np.abs(value_x) > self.zero 132 h = self._safe_step_size(value_x, cond, eps_factor) 133 134 # 5-point stencil evaluations 135 eval_points = [ 136 (value_x + 2 * h, "f_plus"), 137 (value_x - 2 * h, "f_minus"), 138 (value_x + h, "f_plus_h"), 139 (value_x - h, "f_minus_h"), 140 ] 141 results = {} 142 for delta, name in eval_points: 143 X[:, ix] = delta 144 results[name] = np.array(self.prediction_function(X)) 145 X[:, ix] = value_x # Restore 146 147 # Safely compute diagonal Hessian 148 numerator = ( 149 -results["f_plus"] 150 + 16 * results["f_plus_h"] 151 - 30 * self.f0 152 + 16 * results["f_minus_h"] 153 - results["f_minus"] 154 ) 155 denominator = 12 * h**2 156 hessian[:, ix, ix] = self._safe_divide(numerator, denominator) 157 158 # Off-diagonal terms (4-point cross) 159 for ix1, ix2 in self._get_progress_bar( 160 [(i, j) for i in range(p) for j in range(i + 1, p)], 161 desc="2nd-order interactions", 162 ): 163 value_x1 = X[:, ix1].copy() 164 value_x2 = X[:, ix2].copy() 165 cond1 = np.abs(value_x1) > self.zero 166 cond2 = np.abs(value_x2) > self.zero 167 h1 = self._safe_step_size(value_x1, cond1, eps_factor) 168 h2 = self._safe_step_size(value_x2, cond2, eps_factor) 169 170 # 4-point evaluations 171 eval_points = [ 172 (value_x1 + h1, value_x2 + h2, "fx_11"), 173 (value_x1 + h1, value_x2 - h2, "fx_12"), 174 (value_x1 - h1, value_x2 + h2, "fx_21"), 175 (value_x1 - h1, value_x2 - h2, "fx_22"), 176 ] 177 results = {} 178 for d1, d2, name in eval_points: 179 X[:, ix1], X[:, ix2] = d1, d2 180 results[name] = np.array(self.prediction_function(X)) 181 X[:, ix1], X[:, ix2] = value_x1, value_x2 # Restore 182 183 # Safe interaction computation 184 numerator = (results["fx_11"] - results["fx_12"]) - ( 185 results["fx_21"] - results["fx_22"] 186 ) 187 denominator = 4 * h1 * h2 188 interaction = self._safe_divide(numerator, denominator) 189 hessian[:, ix1, ix2] = interaction 190 hessian[:, ix2, ix1] = interaction 191 192 return hessian 193 194 def explain( 195 self, 196 X: np.ndarray, 197 baseline: Optional[np.ndarray] = None, 198 feature_names: Optional[list] = None, 199 include_interactions: bool = False, 200 ) -> Dict: 201 """ 202 Compute feature attributions with optional interactions. 203 204 Args: 205 X: Input data (n_samples, n_features) 206 baseline: Reference values (default=median of X) 207 feature_names: Optional feature names 208 include_interactions: Whether to include 2nd-order effects 209 210 Returns: 211 Dictionary containing: 212 - attributions: 1st-order feature effects 213 - interactions: 2nd-order interaction effects (if computed) 214 - hessian: Raw Hessian matrix (if order=2) 215 - predictions: Model outputs 216 - residuals: Decomposition errors 217 - baseline: Used reference values 218 """ 219 X = np.array(X) 220 self.f0 = self.prediction_function(X) 221 222 if baseline is None: 223 baseline = np.median(X, axis=0) 224 baseline = baseline.reshape(1, -1) 225 226 # Compute 1st-order effects 227 grads = self._compute_first_order(X.copy()) 228 attributions = (X - baseline) * grads 229 predictions = np.array(self.prediction_function(X)) 230 baseline_pred = np.array(self.prediction_function(baseline)).item() 231 232 # Initialize results 233 results = { 234 "attributions": attributions, 235 "gradients": grads, 236 "predictions": predictions, 237 "baseline": baseline.flatten(), 238 "baseline_prediction": baseline_pred, 239 "feature_names": feature_names, 240 } 241 242 # Handle 2nd-order if requested 243 if include_interactions and self.order == 2: 244 if self._hessian is None: 245 self.calculate_interactions(X) 246 247 results.update( 248 { 249 "hessian": self._hessian, 250 "interactions": self._interactions, 251 "total_effect": ( 252 results["attributions"].sum(axis=1) + self._interactions 253 ), 254 } 255 ) 256 results["residuals"] = predictions - ( 257 baseline_pred + results["total_effect"] 258 ) 259 else: 260 results["residuals"] = predictions - ( 261 baseline_pred + results["attributions"].sum(axis=1) 262 ) 263 264 # Add residual metrics 265 results.update( 266 { 267 "max_residual": np.max(np.abs(results["residuals"])), 268 "rmse_residual": np.sqrt(np.mean(results["residuals"] ** 2)), 269 } 270 ) 271 272 # Optional normalization 273 if self.normalize: 274 with warnings.catch_warnings(): 275 warnings.simplefilter("ignore") 276 scaler = MinMaxScaler(feature_range=(-1, 1)) 277 results["attributions"] = scaler.fit_transform( 278 results["attributions"] 279 ) 280 results["attributions"] = ( 281 results["attributions"] 282 / np.sum(results["attributions"], axis=1)[:, np.newaxis] 283 ) 284 285 return results 286 287 def plot_interactions( 288 self, 289 feature_names: Optional[list] = None, 290 max_display: int = 10, 291 title: str = "Top Feature Interactions", 292 ): 293 """ 294 Visualize interaction strengths from precomputed Hessian. 295 296 Args: 297 feature_names: Optional names for display 298 max_display: Number of top interactions to show 299 title: Plot title 300 """ 301 import matplotlib.pyplot as plt 302 from matplotlib import cm 303 304 if self._hessian is None: 305 raise ValueError( 306 "No interactions computed. Call calculate_interactions() first." 307 ) 308 309 if feature_names is None: 310 feature_names = [ 311 f"Feature {i}" for i in range(self._hessian.shape[1]) 312 ] 313 314 mean_inter = np.mean(np.abs(self._hessian), axis=0) 315 np.fill_diagonal(mean_inter, 0) # Exclude diagonal 316 317 # Get top interactions 318 triu_indices = np.triu_indices_from(mean_inter, k=1) 319 inter_pairs = list( 320 zip(triu_indices[0], triu_indices[1], mean_inter[triu_indices]) 321 ) 322 inter_pairs.sort(key=lambda x: -x[2]) 323 top_pairs = inter_pairs[:max_display] 324 325 # Plot 326 plt.figure(figsize=(10, 0.5 * max_display)) 327 colors = cm.viridis( 328 [x[2] / max(x[2] for x in top_pairs) for x in top_pairs] 329 ) 330 331 for i, (f1, f2, val) in enumerate(top_pairs): 332 plt.barh( 333 i, 334 val, 335 color=colors[i], 336 label=f"{feature_names[f1]} × {feature_names[f2]}", 337 ) 338 339 plt.yticks( 340 range(len(top_pairs)), 341 [ 342 f"{feature_names[f1]} × {feature_names[f2]}" 343 for f1, f2, _ in top_pairs 344 ], 345 ) 346 plt.title(title) 347 plt.xlabel("Interaction Strength") 348 plt.tight_layout() 349 plt.show() 350 351 def plot_attributions( 352 self, 353 attributions: np.ndarray, 354 feature_names: Optional[list] = None, 355 max_display: int = 15, 356 title: str = "Feature Importance", 357 ): 358 """ 359 Visualize feature attributions. 360 361 Args: 362 attributions: Attributions matrix from explain() 363 feature_names: Optional feature names 364 max_display: Maximum features to display 365 title: Plot title 366 """ 367 import matplotlib.pyplot as plt 368 369 if feature_names is None: 370 feature_names = [ 371 f"Feature {i}" for i in range(attributions.shape[1]) 372 ] 373 374 mean_attr = np.mean(np.abs(attributions), axis=0) 375 top_indices = np.argsort(mean_attr)[-max_display:] 376 377 plt.figure(figsize=(10, 0.5 * len(top_indices))) 378 plt.barh( 379 range(len(top_indices)), 380 mean_attr[top_indices], 381 tick_label=np.array(feature_names)[top_indices], 382 ) 383 plt.title(title) 384 plt.xlabel("Mean Absolute Attribution") 385 plt.tight_layout() 386 plt.show()
Finite Difference SHAP-like Explainer with 1st/2nd order effects and numerical safeguards.
Features:
- Adaptive step sizes with numerical stability checks
- On-demand interaction computation
- Parallel/sequential execution modes
- Progress tracking and visualization
Parameters: prediction_function (Callable): Model's prediction function order (int): Differentiation order (1 or 2, default=1) zero (float): Threshold for small values (default=machine epsilon) n_jobs (int): Parallel jobs (-1=all cores, 0=sequential, 1+=specific count) normalize (bool): Normalize attributions (default=False) progress_bar (bool/str): Progress bar style (True/False/'notebook'/'terminal')
7class IntegratedGradientsExplainer(BaseEstimator): 8 """ 9 Integrated Gradients Explainer using scale-dependent finite differences. 10 11 Parameters: 12 prediction_function (Callable): Model's prediction function 13 baseline_method (str): "mean", "median", or "zero" (default="mean") 14 n_steps (int): Number of steps along the path (default=50) 15 zero (float): Threshold for small values (default=1e-4) 16 """ 17 18 def __init__( 19 self, 20 prediction_function: Callable, 21 baseline_method: str = "mean", 22 n_steps: int = 50, 23 zero: float = 1e-4, 24 ): 25 self.prediction_function = prediction_function 26 self.baseline_method = baseline_method 27 self.n_steps = n_steps 28 self.zero = zero 29 30 def _compute_baseline(self, X_train: np.ndarray) -> np.ndarray: 31 if self.baseline_method == "mean": 32 return np.mean(X_train, axis=0) 33 elif self.baseline_method == "median": 34 return np.median(X_train, axis=0) 35 elif self.baseline_method == "zero": 36 return np.zeros(X_train.shape[1]) 37 else: 38 raise ValueError("Unknown baseline_method") 39 40 def _scale_dependent_gradients(self, X: np.ndarray) -> np.ndarray: 41 # Scale-dependent finite difference, as in numerical_gradient.py 42 n, p = X.shape 43 grad = np.zeros_like(X) 44 zero = self.zero 45 eps_factor = zero ** (1 / 3) 46 47 for ix in range(p): 48 value_x = X[:, ix].copy() 49 cond = np.abs(value_x) > zero 50 h = eps_factor * value_x * cond + zero * (~cond) 51 X[:, ix] = value_x + h 52 fx_plus = self.prediction_function(X) 53 X[:, ix] = value_x - h 54 fx_minus = self.prediction_function(X) 55 X[:, ix] = value_x # restore 56 grad[:, ix] = (np.asarray(fx_plus) - np.asarray(fx_minus)) / (2 * h) 57 return grad 58 59 def explain( 60 self, 61 X_train: np.ndarray, 62 X_new: np.ndarray, 63 feature_names: Optional[list] = None, 64 ) -> Dict: 65 """ 66 Compute integrated gradients attributions. 67 68 Args: 69 X_train: Training data (for baseline computation) 70 X_new: New data to explain 71 feature_names: Optional feature names 72 73 Returns: 74 Dictionary with attributions, integrated gradients, baseline, etc. 75 """ 76 X_train = np.array(X_train) 77 X_new = np.array(X_new) 78 baseline = self._compute_baseline(X_train) 79 n_obs, n_features = X_new.shape 80 integrated_grads = np.zeros((n_obs, n_features)) 81 82 for alpha in np.linspace(0, 1, self.n_steps): 83 X_interp = baseline + (X_new - baseline) * alpha 84 grads = self._scale_dependent_gradients(X_interp) 85 integrated_grads += grads / self.n_steps 86 87 attributions = (X_new - baseline) * integrated_grads 88 89 baseline_pred = np.array( 90 self.prediction_function(baseline.reshape(1, -1)) 91 ).reshape(-1) 92 predictions = np.array(self.prediction_function(X_new)).reshape(-1) 93 reconstructed = baseline_pred + np.sum(attributions, axis=1) 94 residuals = predictions - reconstructed 95 96 # Additivity check: prediction ≈ baseline + sum(attributions) 97 additivity_rmse = np.sqrt(np.mean(residuals**2)) 98 additivity_max = np.max(np.abs(residuals)) 99 100 return { 101 "attributions": attributions, 102 "integrated_gradients": integrated_grads, 103 "baseline": baseline, 104 "baseline_prediction": baseline_pred, 105 "predictions": predictions, 106 "reconstructed": reconstructed, 107 "residuals": residuals, 108 "additivity_rmse": additivity_rmse, 109 "additivity_max": additivity_max, 110 "feature_names": feature_names, 111 }
Integrated Gradients Explainer using scale-dependent finite differences.
Parameters: prediction_function (Callable): Model's prediction function baseline_method (str): "mean", "median", or "zero" (default="mean") n_steps (int): Number of steps along the path (default=50) zero (float): Threshold for small values (default=1e-4)