GPopt.GeneralizationOpt
1import numpy as np 2import pandas as pd 3from tqdm import tqdm 4from scipy.stats.qmc import Sobol 5from scipy.optimize import minimize as scipy_minimize 6from scipy.stats import norm 7from sklearn.model_selection import train_test_split 8from sklearn.base import BaseEstimator, RegressorMixin 9from SALib.analyze import sobol as sobol_analyze 10from SALib.sample import sobol as sobol_sample 11import warnings 12 13warnings.filterwarnings("ignore") 14 15 16class GenericSurrogate(BaseEstimator, RegressorMixin): 17 """ 18 Generic wrapper for any surrogate model with conformal prediction support 19 20 This class wraps any sklearn-compatible model and adds: 21 - Conformal prediction intervals (distribution-free uncertainty) 22 - Optional Bayesian uncertainty (if model supports it) 23 - Unified prediction interface 24 25 Parameters: 26 ----------- 27 base_model : object 28 Any sklearn-compatible model with fit() and predict() methods 29 conformal : bool 30 Whether to use conformal prediction for uncertainty 31 bayesian : bool 32 Whether model provides native uncertainty (e.g., GP, Bayesian NN) 33 """ 34 35 def __init__(self, base_model, conformal=True, bayesian=False): 36 self.base_model = base_model 37 self.conformal = conformal 38 self.bayesian = bayesian 39 self.nonconformity_scores_ = None 40 self.is_fitted_ = False 41 42 def fit(self, X, y, cal_ratio=0.2): 43 """ 44 Fit surrogate with optional conformal calibration 45 46 Parameters: 47 ----------- 48 X : array-like, shape (n_samples, n_features) 49 Training features 50 y : array-like, shape (n_samples,) 51 Training targets 52 cal_ratio : float 53 Proportion of data for conformal calibration (if conformal=True) 54 55 Returns: 56 -------- 57 self : object 58 """ 59 if self.conformal and cal_ratio > 0: 60 # Split conformal prediction: reserve data for calibration 61 n_cal = int(len(X) * cal_ratio) 62 X_train, X_cal = X[:-n_cal], X[-n_cal:] 63 y_train, y_cal = y[:-n_cal], y[-n_cal:] 64 65 self.base_model.fit(X_train, y_train) 66 67 # Compute nonconformity scores on calibration set 68 y_cal_pred = self.base_model.predict(X_cal) 69 self.nonconformity_scores_ = np.abs(y_cal - y_cal_pred) 70 else: 71 # No conformal prediction: use all data 72 self.base_model.fit(X, y) 73 74 self.is_fitted_ = True 75 return self 76 77 def predict(self, X, return_std=False, confidence=0.95): 78 """ 79 Unified prediction interface with optional uncertainty 80 81 Parameters: 82 ----------- 83 X : array-like, shape (n_samples, n_features) 84 Test features 85 return_std : bool 86 Whether to return uncertainty estimates 87 confidence : float 88 Confidence level for prediction intervals (if conformal=True) 89 90 Returns: 91 -------- 92 predictions : array or tuple 93 If return_std=False: predictions only 94 If return_std=True: (predictions, lower_bound, upper_bound) 95 96 Notes: 97 ------ 98 - Conformal: Distribution-free intervals with finite-sample guarantees 99 - Bayesian: Model-based uncertainty (e.g., GP posterior std) 100 """ 101 if not self.is_fitted_: 102 raise ValueError("Model must be fitted before prediction") 103 104 # Get point predictions 105 y_pred = self.base_model.predict(X) 106 107 if not return_std: 108 return y_pred 109 110 # Compute uncertainty estimates 111 if self.conformal and self.nonconformity_scores_ is not None: 112 # Conformal prediction intervals 113 alpha = 1 - confidence 114 n_scores = len(self.nonconformity_scores_) 115 116 # Calculate the effective quantile level based on (n+1) rule 117 # Cap the quantile level at 1.0 to prevent ValueError from np.quantile 118 quantile_level = min( 119 1.0, np.ceil((1 - alpha) * (n_scores + 1)) / n_scores 120 ) 121 122 q = np.quantile(self.nonconformity_scores_, quantile_level) 123 y_lower = y_pred - q 124 y_upper = y_pred + q 125 126 elif self.bayesian and hasattr(self.base_model, "predict"): 127 # Try to get Bayesian uncertainty from model 128 try: 129 # For GP-like models 130 if ( 131 hasattr(self.base_model, "predict") 132 and "return_std" 133 in self.base_model.predict.__code__.co_varnames 134 ): 135 _, y_std = self.base_model.predict(X, return_std=True) 136 y_lower = y_pred - 2 * y_std # 95% CI for Gaussian 137 y_upper = y_pred + 2 * y_std 138 else: 139 # Fallback: assume constant uncertainty 140 y_std = np.std(y_pred) * np.ones_like(y_pred) 141 y_lower = y_pred - 2 * y_std 142 y_upper = y_pred + 2 * y_std 143 except: 144 # No uncertainty available 145 y_lower = y_pred 146 y_upper = y_pred 147 else: 148 # No uncertainty quantification available 149 y_lower = y_pred 150 y_upper = y_pred 151 152 return y_pred, y_lower, y_upper 153 154 155class GeneralizationOpt: 156 """ 157 Generic framework for model generalization diagnostics 158 159 This class is agnostic to: 160 - The base model being tuned (LightGBM, XGBoost, neural nets, etc.) 161 - The surrogate model used (GP, RF, neural nets, etc.) 162 - The hyperparameter space structure 163 164 Example usage: 165 -------------- 166 >>> from sklearn.gaussian_process import GaussianProcessRegressor 167 >>> from sklearn.ensemble import RandomForestRegressor 168 >>> 169 >>> # Define hyperparameter space 170 >>> hyperparams = { 171 >>> 'learning_rate': (0.01, 0.3, True), # (min, max, log_scale) 172 >>> 'max_depth': (3, 10, False) 173 >>> } 174 >>> 175 >>> # Initialize framework 176 >>> diag = GeneralizationOpt(hyperparams, random_state=42) 177 >>> 178 >>> # Use GP surrogate with Bayesian uncertainty 179 >>> gp = GaussianProcessRegressor() 180 >>> surrogate = GenericSurrogate(gp, conformal=False, bayesian=True) 181 >>> 182 >>> # Or use RF with conformal prediction 183 >>> rf = RandomForestRegressor() 184 >>> surrogate = GenericSurrogate(rf, conformal=True, bayesian=False) 185 >>> 186 >>> # Run diagnostics 187 >>> results = diag.run_diagnostics( 188 >>> X_train, X_test, y_train, y_test, 189 >>> model_class=LGBMClassifier, 190 >>> surrogate=surrogate, 191 >>> n_samples=100 192 >>> ) 193 """ 194 195 def __init__(self, hyperparams, random_state=42): 196 """ 197 Initialize diagnostics framework 198 199 Parameters: 200 ----------- 201 hyperparams : dict 202 Hyperparameter space definition 203 Format: {param_name: (min_val, max_val, log_scale), ...} 204 random_state : int 205 Random seed for reproducibility 206 """ 207 self.hyperparams = hyperparams 208 self.hyperparam_names = list(hyperparams.keys()) 209 self.n_dims = len(self.hyperparam_names) 210 self.random_state = random_state 211 212 # Storage 213 self.sobol_samples = None 214 self.evaluation_results = None 215 self.surrogate = None 216 self.sobol_indices = None 217 218 def _denormalize_config(self, normalized_config): 219 """Convert normalized [0,1] to actual hyperparameter values""" 220 config = {} 221 for i, param_name in enumerate(self.hyperparam_names): 222 min_val, max_val, log_scale = self.hyperparams[param_name] 223 norm_val = normalized_config[i] 224 225 if log_scale: 226 log_min, log_max = np.log(min_val), np.log(max_val) 227 config[param_name] = np.exp( 228 log_min + norm_val * (log_max - log_min) 229 ) 230 else: 231 denormalized_value = min_val + norm_val * (max_val - min_val) 232 # Special handling for boolean parameters represented as (0, 1, False) 233 if ( 234 min_val == 0 235 and max_val == 1 236 and not log_scale 237 and isinstance(min_val, int) 238 ): 239 config[param_name] = bool(round(denormalized_value)) 240 # Convert to int if needed (if min and max are integers and not log-scale) 241 elif ( 242 isinstance(min_val, int) 243 and isinstance(max_val, int) 244 and not log_scale 245 ): 246 config[param_name] = int(round(denormalized_value)) 247 else: 248 config[param_name] = denormalized_value 249 250 return config 251 252 def generate_sobol_samples(self, n_samples=None): 253 """ 254 Generate Sobol quasi-random samples in hyperparameter space 255 256 Parameters: 257 ----------- 258 n_samples : int, optional 259 Number of samples (default: 75 * n_dims) 260 261 Returns: 262 -------- 263 samples : ndarray, shape (n_samples, n_dims) 264 Normalized samples in [0, 1]^n_dims 265 """ 266 if n_samples is None: 267 n_samples = 75 * self.n_dims 268 269 sobol = Sobol(d=self.n_dims, scramble=True, seed=self.random_state) 270 self.sobol_samples = sobol.random(n_samples) 271 272 return self.sobol_samples 273 274 def evaluate_configurations( 275 self, 276 X_train, 277 X_test, 278 y_train, 279 y_test, 280 model_class, 281 model_kwargs=None, 282 evaluation_fn=None, 283 n_folds=5, 284 ): 285 """ 286 Evaluate model configurations and compute generalization gaps 287 288 Parameters: 289 ----------- 290 X_train, X_test, y_train, y_test : arrays 291 Train/test data 292 model_class : class 293 Model class to instantiate (e.g., LGBMClassifier) 294 model_kwargs : dict, optional 295 Fixed kwargs for model (e.g., {'random_state': 42, 'verbose': -1}) 296 evaluation_fn : callable, optional 297 Function to evaluate model: evaluation_fn(y_true, y_pred) -> score 298 Default: balanced_accuracy_score for classification 299 n_folds : int 300 Number of CV folds for training score estimation 301 302 Returns: 303 -------- 304 results : DataFrame 305 Results with columns: hyperparams, cv_train, test_score, gap_abs, etc. 306 """ 307 from sklearn.model_selection import StratifiedKFold, KFold 308 from sklearn.metrics import balanced_accuracy_score, r2_score 309 310 if self.sobol_samples is None: 311 raise ValueError("Must generate Sobol samples first") 312 313 if model_kwargs is None: 314 model_kwargs = {} 315 316 if evaluation_fn is None: 317 # Heuristic: check if classification or regression 318 if len(np.unique(y_train)) < 20: 319 evaluation_fn = balanced_accuracy_score 320 kfold = StratifiedKFold( 321 n_splits=n_folds, 322 shuffle=True, 323 random_state=self.random_state, 324 ) 325 else: 326 evaluation_fn = r2_score 327 kfold = KFold( 328 n_splits=n_folds, 329 shuffle=True, 330 random_state=self.random_state, 331 ) 332 else: 333 kfold = KFold( 334 n_splits=n_folds, shuffle=True, random_state=self.random_state 335 ) 336 337 results = [] 338 n_samples = len(self.sobol_samples) 339 340 print( 341 f"Evaluating {n_samples} configurations ({n_folds}-fold CV + test)..." 342 ) 343 344 for idx, norm_config in tqdm(enumerate(self.sobol_samples)): 345 if (idx + 1) % 50 == 0: 346 print( 347 f" Progress: {idx + 1}/{n_samples} " 348 f"({100*(idx+1)/n_samples:.1f}%)" 349 ) 350 351 config = self._denormalize_config(norm_config) 352 353 # Cross-validation score 354 cv_scores = [] 355 for train_idx, val_idx in kfold.split(X_train, y_train): 356 if hasattr(X_train, "iloc"): # DataFrame 357 X_tr, X_val = X_train.iloc[train_idx], X_train.iloc[val_idx] 358 y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx] 359 else: # ndarray 360 X_tr, X_val = X_train[train_idx], X_train[val_idx] 361 y_tr, y_val = y_train[train_idx], y_train[val_idx] 362 363 model = model_class(**config, **model_kwargs) 364 model.fit(X_tr, y_tr) 365 y_pred = model.predict(X_val) 366 cv_scores.append(evaluation_fn(y_val, y_pred)) 367 368 cv_train = np.mean(cv_scores) 369 370 # Test score 371 model = model_class(**config, **model_kwargs) 372 model.fit(X_train, y_train) 373 y_pred_test = model.predict(X_test) 374 test_score = evaluation_fn(y_test, y_pred_test) 375 376 # Generalization gaps 377 gap_abs = cv_train - test_score 378 gap_norm = ( 379 (cv_train - test_score) / (1 - cv_train) if cv_train < 1 else 0 380 ) 381 gap_mpe = ( 382 ((cv_train - test_score) / cv_train * 100) 383 if cv_train > 0 384 else 0 385 ) 386 387 results.append( 388 { 389 **config, 390 "cv_train": cv_train, 391 "test_score": test_score, 392 "gap_abs": gap_abs, 393 "gap_norm": gap_norm, 394 "gap_mpe": gap_mpe, 395 } 396 ) 397 398 self.evaluation_results = pd.DataFrame(results) 399 400 print(f"\n✓ Completed {n_samples} evaluations") 401 print( 402 f" CV Train Score: {self.evaluation_results['cv_train'].mean():.4f} \u00b1 " 403 f"{self.evaluation_results['cv_train'].std():.4f}" 404 ) 405 print( 406 f" Test Score: {self.evaluation_results['test_score'].mean():.4f} \u00b1 " 407 f"{self.evaluation_results['test_score'].std():.4f}" 408 ) 409 print( 410 f" Absolute Gap: {self.evaluation_results['gap_abs'].mean():.4f} \u00b1 " 411 f"{self.evaluation_results['gap_abs'].std():.4f}" 412 ) 413 414 return self.evaluation_results 415 416 def fit_surrogate(self, surrogate, target_gap="gap_abs"): 417 """ 418 Fit surrogate model to generalization gap data 419 420 Parameters: 421 ----------- 422 surrogate : GenericSurrogate 423 Surrogate model instance 424 target_gap : str 425 Target variable to model ('gap_abs', 'gap_norm', 'gap_mpe') 426 427 Returns: 428 -------- 429 surrogate : GenericSurrogate 430 Fitted surrogate model 431 metrics : dict 432 Performance metrics (R², MAE, coverage if applicable) 433 """ 434 if self.evaluation_results is None: 435 raise ValueError("Must evaluate configurations first") 436 437 X = self.sobol_samples 438 y = self.evaluation_results[target_gap].values 439 440 # Train/validation split 441 split_idx = int(0.8 * len(X)) 442 X_train, X_val = X[:split_idx], X[split_idx:] 443 y_train, y_val = y[:split_idx], y[split_idx:] 444 445 print( 446 f"Training surrogate on {len(X_train)} samples, " 447 f"validating on {len(X_val)} samples..." 448 ) 449 450 # Fit surrogate 451 surrogate.fit(X_train, y_train) 452 453 # Evaluate 454 y_pred, y_lower, y_upper = surrogate.predict(X_val, return_std=True) 455 456 from sklearn.metrics import r2_score, mean_absolute_error 457 458 r2 = r2_score(y_val, y_pred) 459 mae = mean_absolute_error(y_val, y_pred) 460 461 metrics = {"r2": r2, "mae": mae} 462 463 # Compute coverage if intervals available 464 if not np.array_equal(y_lower, y_upper): 465 coverage = np.mean((y_val >= y_lower) & (y_val <= y_upper)) 466 interval_width = np.mean(y_upper - y_lower) 467 metrics["coverage"] = coverage 468 metrics["interval_width"] = interval_width 469 470 print(f"\n✓ Surrogate Performance:") 471 print(f" R² Score: {r2:.4f}") 472 print(f" MAE: {mae:.6f}") 473 print(f" Coverage: {coverage:.2%}") 474 print(f" Mean Interval Width: {interval_width:.6f}") 475 else: 476 print(f"\n✓ Surrogate Performance:") 477 print(f" R² Score: {r2:.4f}") 478 print(f" MAE: {mae:.6f}") 479 480 self.surrogate = surrogate 481 return surrogate, metrics 482 483 def compute_sobol_indices(self, n_samples=10000): 484 """ 485 Compute Sobol sensitivity indices for global sensitivity analysis 486 487 Parameters: 488 ----------- 489 n_samples : int 490 Number of Monte Carlo samples for Sobol estimation 491 492 Returns: 493 -------- 494 indices : DataFrame 495 Sobol indices with columns: Parameter, S1, ST, Interaction 496 """ 497 if self.surrogate is None: 498 raise ValueError("Must fit surrogate first") 499 500 problem = { 501 "num_vars": self.n_dims, 502 "names": self.hyperparam_names, 503 "bounds": [[0, 1]] * self.n_dims, 504 } 505 506 param_values = sobol_sample.sample( 507 problem, n_samples, calc_second_order=False 508 ) 509 Y = self.surrogate.predict(param_values) 510 511 Si = sobol_analyze.analyze( 512 problem, Y, calc_second_order=False, print_to_console=False 513 ) 514 515 self.sobol_indices = pd.DataFrame( 516 { 517 "Parameter": self.hyperparam_names, 518 "S1": Si["S1"], 519 "ST": Si["ST"], 520 "Interaction": Si["ST"] - Si["S1"], 521 } 522 ).sort_values("S1", ascending=False) 523 524 print("\n" + "=" * 70) 525 print("Sobol Sensitivity Indices (Variance Decomposition)") 526 print("=" * 70) 527 print(self.sobol_indices.to_string(index=False)) 528 print("\nS1 = First-order effect (main effect)") 529 print("ST = Total-order effect (main + interactions)") 530 print("Interaction = ST - S1 (pure interaction effects)") 531 532 return self.sobol_indices 533 534 def optimize_acquisition( 535 self, 536 acquisition="ei", 537 n_starts=10, 538 kappa=2.0, 539 xi=0.01, 540 minimize_gap=True, 541 ): 542 """ 543 Optimize acquisition function to find best hyperparameter configuration 544 545 Parameters: 546 ----------- 547 acquisition : str 548 Acquisition function: 'ei' (Expected Improvement), 549 'ucb' (Upper Confidence Bound), 'pi' (Probability of Improvement) 550 n_starts : int 551 Number of random restarts for multi-start optimization 552 kappa : float 553 Exploration parameter for UCB (higher = more exploration) 554 xi : float 555 Exploration parameter for EI/PI (higher = more exploration) 556 minimize_gap : bool 557 If True, minimize gap; if False, maximize test performance 558 559 Returns: 560 -------- 561 results : dict 562 Optimization results including best config and expected improvement 563 """ 564 if self.surrogate is None: 565 raise ValueError("Must fit surrogate first") 566 567 # Get current best 568 if minimize_gap: 569 y_best = self.evaluation_results["gap_abs"].min() 570 print(f"Current best gap: {y_best:.6f}") 571 else: 572 y_best = self.evaluation_results["test_score"].max() 573 print(f"Current best test score: {y_best:.6f}") 574 575 # Define acquisition functions 576 def expected_improvement(x): 577 x = x.reshape(1, -1) 578 y_pred, y_lower, y_upper = self.surrogate.predict( 579 x, return_std=True 580 ) 581 mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4 # Approx std 582 583 if sigma < 1e-10: 584 return 0.0 585 586 if minimize_gap: 587 imp = y_best - mu - xi 588 else: 589 imp = mu - y_best - xi 590 591 Z = imp / sigma 592 ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z) 593 return -ei 594 595 def upper_confidence_bound(x): 596 x = x.reshape(1, -1) 597 y_pred, y_lower, y_upper = self.surrogate.predict( 598 x, return_std=True 599 ) 600 mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4 601 602 if minimize_gap: 603 ucb = mu - kappa * sigma 604 else: 605 ucb = mu + kappa * sigma 606 607 return ucb if minimize_gap else -ucb 608 609 def probability_of_improvement(x): 610 x = x.reshape(1, -1) 611 y_pred, y_lower, y_upper = self.surrogate.predict( 612 x, return_std=True 613 ) 614 mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4 615 616 if sigma < 1e-10: 617 return 0.0 618 619 if minimize_gap: 620 Z = (y_best - mu - xi) / sigma 621 else: 622 Z = (mu - y_best - xi) / sigma 623 624 pi = norm.cdf(Z) 625 return -pi 626 627 acq_funcs = { 628 "ei": expected_improvement, 629 "ucb": upper_confidence_bound, 630 "pi": probability_of_improvement, 631 } 632 acq_func = acq_funcs[acquisition] 633 634 # Multi-start optimization 635 print(f"\nRunning {n_starts}-start L-BFGS-B optimization...") 636 637 best_result = None 638 best_acq_value = np.inf 639 640 np.random.seed(self.random_state) 641 starting_points = np.random.rand(n_starts, self.n_dims) 642 643 # Add best observed point as starting point 644 if len(self.sobol_samples) > 0: 645 best_idx = ( 646 self.evaluation_results["gap_abs"].idxmin() 647 if minimize_gap 648 else self.evaluation_results["test_score"].idxmax() 649 ) 650 starting_points[0] = self.sobol_samples[best_idx] 651 652 for i, x0 in enumerate(starting_points): 653 try: 654 result = scipy_minimize( 655 acq_func, 656 x0, 657 method="L-BFGS-B", 658 bounds=[(0, 1)] * self.n_dims, 659 options={"maxiter": 1000, "ftol": 1e-9}, 660 ) 661 662 if result.success and result.fun < best_acq_value: 663 best_acq_value = result.fun 664 best_result = result 665 except Exception as e: 666 print(f" Warning: Optimization {i+1} failed: {str(e)}") 667 668 if best_result is None: 669 raise RuntimeError("All optimization attempts failed") 670 671 # Extract best configuration 672 best_x_normalized = best_result.x 673 best_config = self._denormalize_config(best_x_normalized) 674 675 # Predict gap at optimum 676 y_pred, y_lower, y_upper = self.surrogate.predict( 677 best_x_normalized.reshape(1, -1), return_std=True 678 ) 679 mu_best = y_pred[0] 680 681 print(f"\n✓ Optimization Complete") 682 print(f" Acquisition: {acquisition.upper()}") 683 print(f" Best acquisition value: {-best_acq_value:.6f}") 684 print(f" Predicted gap at optimum: {mu_best:.6f}") 685 print(f" 95% CI: [{y_lower[0]:.6f}, {y_upper[0]:.6f}]") 686 print(f"\n Recommended Configuration:") 687 for param, value in best_config.items(): 688 print( 689 f" {param}: {value if isinstance(value, int) else f'{value:.4f}'}" 690 ) 691 692 improvement = y_best - mu_best if minimize_gap else mu_best - y_best 693 print(f"\n Expected improvement: {improvement:.6f}") 694 695 return { 696 "best_config": best_config, 697 "best_config_normalized": best_x_normalized, 698 "predicted_gap_mean": mu_best, 699 "predicted_gap_lower": y_lower[0], 700 "predicted_gap_upper": y_upper[0], 701 "current_best": y_best, 702 "expected_improvement": improvement, 703 "acquisition_function": acquisition, 704 } 705 706 def run_diagnostics( 707 self, 708 X_train, 709 X_test, 710 y_train, 711 y_test, 712 model_class, 713 surrogate, 714 n_samples=None, 715 model_kwargs=None, 716 target_gap="gap_abs", 717 ): 718 """ 719 Run complete diagnostic pipeline 720 721 Parameters: 722 ----------- 723 X_train, X_test, y_train, y_test : arrays 724 Train/test data 725 model_class : class 726 Model class to tune 727 surrogate : GenericSurrogate 728 Surrogate model for gap modeling 729 n_samples : int, optional 730 Number of Sobol samples 731 model_kwargs : dict, optional 732 Fixed model kwargs 733 target_gap : str 734 Gap metric to model 735 736 Returns: 737 -------- 738 results : dict 739 Complete diagnostic results 740 """ 741 # Phase 1: Exploration 742 self.generate_sobol_samples(n_samples) 743 eval_results = self.evaluate_configurations( 744 X_train, X_test, y_train, y_test, model_class, model_kwargs 745 ) 746 747 # Phase 2: Surrogate fitting 748 surrogate, metrics = self.fit_surrogate(surrogate, target_gap) 749 750 # Phase 3: Analysis 751 sobol_indices = self.compute_sobol_indices() 752 753 # Phase 4: Optimization 754 opt_results = self.optimize_acquisition() 755 756 return { 757 "evaluation_results": eval_results, 758 "surrogate_metrics": metrics, 759 "sobol_indices": sobol_indices, 760 "optimization": opt_results, 761 }
17class GenericSurrogate(BaseEstimator, RegressorMixin): 18 """ 19 Generic wrapper for any surrogate model with conformal prediction support 20 21 This class wraps any sklearn-compatible model and adds: 22 - Conformal prediction intervals (distribution-free uncertainty) 23 - Optional Bayesian uncertainty (if model supports it) 24 - Unified prediction interface 25 26 Parameters: 27 ----------- 28 base_model : object 29 Any sklearn-compatible model with fit() and predict() methods 30 conformal : bool 31 Whether to use conformal prediction for uncertainty 32 bayesian : bool 33 Whether model provides native uncertainty (e.g., GP, Bayesian NN) 34 """ 35 36 def __init__(self, base_model, conformal=True, bayesian=False): 37 self.base_model = base_model 38 self.conformal = conformal 39 self.bayesian = bayesian 40 self.nonconformity_scores_ = None 41 self.is_fitted_ = False 42 43 def fit(self, X, y, cal_ratio=0.2): 44 """ 45 Fit surrogate with optional conformal calibration 46 47 Parameters: 48 ----------- 49 X : array-like, shape (n_samples, n_features) 50 Training features 51 y : array-like, shape (n_samples,) 52 Training targets 53 cal_ratio : float 54 Proportion of data for conformal calibration (if conformal=True) 55 56 Returns: 57 -------- 58 self : object 59 """ 60 if self.conformal and cal_ratio > 0: 61 # Split conformal prediction: reserve data for calibration 62 n_cal = int(len(X) * cal_ratio) 63 X_train, X_cal = X[:-n_cal], X[-n_cal:] 64 y_train, y_cal = y[:-n_cal], y[-n_cal:] 65 66 self.base_model.fit(X_train, y_train) 67 68 # Compute nonconformity scores on calibration set 69 y_cal_pred = self.base_model.predict(X_cal) 70 self.nonconformity_scores_ = np.abs(y_cal - y_cal_pred) 71 else: 72 # No conformal prediction: use all data 73 self.base_model.fit(X, y) 74 75 self.is_fitted_ = True 76 return self 77 78 def predict(self, X, return_std=False, confidence=0.95): 79 """ 80 Unified prediction interface with optional uncertainty 81 82 Parameters: 83 ----------- 84 X : array-like, shape (n_samples, n_features) 85 Test features 86 return_std : bool 87 Whether to return uncertainty estimates 88 confidence : float 89 Confidence level for prediction intervals (if conformal=True) 90 91 Returns: 92 -------- 93 predictions : array or tuple 94 If return_std=False: predictions only 95 If return_std=True: (predictions, lower_bound, upper_bound) 96 97 Notes: 98 ------ 99 - Conformal: Distribution-free intervals with finite-sample guarantees 100 - Bayesian: Model-based uncertainty (e.g., GP posterior std) 101 """ 102 if not self.is_fitted_: 103 raise ValueError("Model must be fitted before prediction") 104 105 # Get point predictions 106 y_pred = self.base_model.predict(X) 107 108 if not return_std: 109 return y_pred 110 111 # Compute uncertainty estimates 112 if self.conformal and self.nonconformity_scores_ is not None: 113 # Conformal prediction intervals 114 alpha = 1 - confidence 115 n_scores = len(self.nonconformity_scores_) 116 117 # Calculate the effective quantile level based on (n+1) rule 118 # Cap the quantile level at 1.0 to prevent ValueError from np.quantile 119 quantile_level = min( 120 1.0, np.ceil((1 - alpha) * (n_scores + 1)) / n_scores 121 ) 122 123 q = np.quantile(self.nonconformity_scores_, quantile_level) 124 y_lower = y_pred - q 125 y_upper = y_pred + q 126 127 elif self.bayesian and hasattr(self.base_model, "predict"): 128 # Try to get Bayesian uncertainty from model 129 try: 130 # For GP-like models 131 if ( 132 hasattr(self.base_model, "predict") 133 and "return_std" 134 in self.base_model.predict.__code__.co_varnames 135 ): 136 _, y_std = self.base_model.predict(X, return_std=True) 137 y_lower = y_pred - 2 * y_std # 95% CI for Gaussian 138 y_upper = y_pred + 2 * y_std 139 else: 140 # Fallback: assume constant uncertainty 141 y_std = np.std(y_pred) * np.ones_like(y_pred) 142 y_lower = y_pred - 2 * y_std 143 y_upper = y_pred + 2 * y_std 144 except: 145 # No uncertainty available 146 y_lower = y_pred 147 y_upper = y_pred 148 else: 149 # No uncertainty quantification available 150 y_lower = y_pred 151 y_upper = y_pred 152 153 return y_pred, y_lower, y_upper
Generic wrapper for any surrogate model with conformal prediction support
This class wraps any sklearn-compatible model and adds:
- Conformal prediction intervals (distribution-free uncertainty)
- Optional Bayesian uncertainty (if model supports it)
- Unified prediction interface
Parameters:
base_model : object Any sklearn-compatible model with fit() and predict() methods conformal : bool Whether to use conformal prediction for uncertainty bayesian : bool Whether model provides native uncertainty (e.g., GP, Bayesian NN)
43 def fit(self, X, y, cal_ratio=0.2): 44 """ 45 Fit surrogate with optional conformal calibration 46 47 Parameters: 48 ----------- 49 X : array-like, shape (n_samples, n_features) 50 Training features 51 y : array-like, shape (n_samples,) 52 Training targets 53 cal_ratio : float 54 Proportion of data for conformal calibration (if conformal=True) 55 56 Returns: 57 -------- 58 self : object 59 """ 60 if self.conformal and cal_ratio > 0: 61 # Split conformal prediction: reserve data for calibration 62 n_cal = int(len(X) * cal_ratio) 63 X_train, X_cal = X[:-n_cal], X[-n_cal:] 64 y_train, y_cal = y[:-n_cal], y[-n_cal:] 65 66 self.base_model.fit(X_train, y_train) 67 68 # Compute nonconformity scores on calibration set 69 y_cal_pred = self.base_model.predict(X_cal) 70 self.nonconformity_scores_ = np.abs(y_cal - y_cal_pred) 71 else: 72 # No conformal prediction: use all data 73 self.base_model.fit(X, y) 74 75 self.is_fitted_ = True 76 return self
Fit surrogate with optional conformal calibration
Parameters:
X : array-like, shape (n_samples, n_features) Training features y : array-like, shape (n_samples,) Training targets cal_ratio : float Proportion of data for conformal calibration (if conformal=True)
Returns:
self : object
78 def predict(self, X, return_std=False, confidence=0.95): 79 """ 80 Unified prediction interface with optional uncertainty 81 82 Parameters: 83 ----------- 84 X : array-like, shape (n_samples, n_features) 85 Test features 86 return_std : bool 87 Whether to return uncertainty estimates 88 confidence : float 89 Confidence level for prediction intervals (if conformal=True) 90 91 Returns: 92 -------- 93 predictions : array or tuple 94 If return_std=False: predictions only 95 If return_std=True: (predictions, lower_bound, upper_bound) 96 97 Notes: 98 ------ 99 - Conformal: Distribution-free intervals with finite-sample guarantees 100 - Bayesian: Model-based uncertainty (e.g., GP posterior std) 101 """ 102 if not self.is_fitted_: 103 raise ValueError("Model must be fitted before prediction") 104 105 # Get point predictions 106 y_pred = self.base_model.predict(X) 107 108 if not return_std: 109 return y_pred 110 111 # Compute uncertainty estimates 112 if self.conformal and self.nonconformity_scores_ is not None: 113 # Conformal prediction intervals 114 alpha = 1 - confidence 115 n_scores = len(self.nonconformity_scores_) 116 117 # Calculate the effective quantile level based on (n+1) rule 118 # Cap the quantile level at 1.0 to prevent ValueError from np.quantile 119 quantile_level = min( 120 1.0, np.ceil((1 - alpha) * (n_scores + 1)) / n_scores 121 ) 122 123 q = np.quantile(self.nonconformity_scores_, quantile_level) 124 y_lower = y_pred - q 125 y_upper = y_pred + q 126 127 elif self.bayesian and hasattr(self.base_model, "predict"): 128 # Try to get Bayesian uncertainty from model 129 try: 130 # For GP-like models 131 if ( 132 hasattr(self.base_model, "predict") 133 and "return_std" 134 in self.base_model.predict.__code__.co_varnames 135 ): 136 _, y_std = self.base_model.predict(X, return_std=True) 137 y_lower = y_pred - 2 * y_std # 95% CI for Gaussian 138 y_upper = y_pred + 2 * y_std 139 else: 140 # Fallback: assume constant uncertainty 141 y_std = np.std(y_pred) * np.ones_like(y_pred) 142 y_lower = y_pred - 2 * y_std 143 y_upper = y_pred + 2 * y_std 144 except: 145 # No uncertainty available 146 y_lower = y_pred 147 y_upper = y_pred 148 else: 149 # No uncertainty quantification available 150 y_lower = y_pred 151 y_upper = y_pred 152 153 return y_pred, y_lower, y_upper
Unified prediction interface with optional uncertainty
Parameters:
X : array-like, shape (n_samples, n_features) Test features return_std : bool Whether to return uncertainty estimates confidence : float Confidence level for prediction intervals (if conformal=True)
Returns:
predictions : array or tuple If return_std=False: predictions only If return_std=True: (predictions, lower_bound, upper_bound)
Notes:
- Conformal: Distribution-free intervals with finite-sample guarantees
- Bayesian: Model-based uncertainty (e.g., GP posterior std)
156class GeneralizationOpt: 157 """ 158 Generic framework for model generalization diagnostics 159 160 This class is agnostic to: 161 - The base model being tuned (LightGBM, XGBoost, neural nets, etc.) 162 - The surrogate model used (GP, RF, neural nets, etc.) 163 - The hyperparameter space structure 164 165 Example usage: 166 -------------- 167 >>> from sklearn.gaussian_process import GaussianProcessRegressor 168 >>> from sklearn.ensemble import RandomForestRegressor 169 >>> 170 >>> # Define hyperparameter space 171 >>> hyperparams = { 172 >>> 'learning_rate': (0.01, 0.3, True), # (min, max, log_scale) 173 >>> 'max_depth': (3, 10, False) 174 >>> } 175 >>> 176 >>> # Initialize framework 177 >>> diag = GeneralizationOpt(hyperparams, random_state=42) 178 >>> 179 >>> # Use GP surrogate with Bayesian uncertainty 180 >>> gp = GaussianProcessRegressor() 181 >>> surrogate = GenericSurrogate(gp, conformal=False, bayesian=True) 182 >>> 183 >>> # Or use RF with conformal prediction 184 >>> rf = RandomForestRegressor() 185 >>> surrogate = GenericSurrogate(rf, conformal=True, bayesian=False) 186 >>> 187 >>> # Run diagnostics 188 >>> results = diag.run_diagnostics( 189 >>> X_train, X_test, y_train, y_test, 190 >>> model_class=LGBMClassifier, 191 >>> surrogate=surrogate, 192 >>> n_samples=100 193 >>> ) 194 """ 195 196 def __init__(self, hyperparams, random_state=42): 197 """ 198 Initialize diagnostics framework 199 200 Parameters: 201 ----------- 202 hyperparams : dict 203 Hyperparameter space definition 204 Format: {param_name: (min_val, max_val, log_scale), ...} 205 random_state : int 206 Random seed for reproducibility 207 """ 208 self.hyperparams = hyperparams 209 self.hyperparam_names = list(hyperparams.keys()) 210 self.n_dims = len(self.hyperparam_names) 211 self.random_state = random_state 212 213 # Storage 214 self.sobol_samples = None 215 self.evaluation_results = None 216 self.surrogate = None 217 self.sobol_indices = None 218 219 def _denormalize_config(self, normalized_config): 220 """Convert normalized [0,1] to actual hyperparameter values""" 221 config = {} 222 for i, param_name in enumerate(self.hyperparam_names): 223 min_val, max_val, log_scale = self.hyperparams[param_name] 224 norm_val = normalized_config[i] 225 226 if log_scale: 227 log_min, log_max = np.log(min_val), np.log(max_val) 228 config[param_name] = np.exp( 229 log_min + norm_val * (log_max - log_min) 230 ) 231 else: 232 denormalized_value = min_val + norm_val * (max_val - min_val) 233 # Special handling for boolean parameters represented as (0, 1, False) 234 if ( 235 min_val == 0 236 and max_val == 1 237 and not log_scale 238 and isinstance(min_val, int) 239 ): 240 config[param_name] = bool(round(denormalized_value)) 241 # Convert to int if needed (if min and max are integers and not log-scale) 242 elif ( 243 isinstance(min_val, int) 244 and isinstance(max_val, int) 245 and not log_scale 246 ): 247 config[param_name] = int(round(denormalized_value)) 248 else: 249 config[param_name] = denormalized_value 250 251 return config 252 253 def generate_sobol_samples(self, n_samples=None): 254 """ 255 Generate Sobol quasi-random samples in hyperparameter space 256 257 Parameters: 258 ----------- 259 n_samples : int, optional 260 Number of samples (default: 75 * n_dims) 261 262 Returns: 263 -------- 264 samples : ndarray, shape (n_samples, n_dims) 265 Normalized samples in [0, 1]^n_dims 266 """ 267 if n_samples is None: 268 n_samples = 75 * self.n_dims 269 270 sobol = Sobol(d=self.n_dims, scramble=True, seed=self.random_state) 271 self.sobol_samples = sobol.random(n_samples) 272 273 return self.sobol_samples 274 275 def evaluate_configurations( 276 self, 277 X_train, 278 X_test, 279 y_train, 280 y_test, 281 model_class, 282 model_kwargs=None, 283 evaluation_fn=None, 284 n_folds=5, 285 ): 286 """ 287 Evaluate model configurations and compute generalization gaps 288 289 Parameters: 290 ----------- 291 X_train, X_test, y_train, y_test : arrays 292 Train/test data 293 model_class : class 294 Model class to instantiate (e.g., LGBMClassifier) 295 model_kwargs : dict, optional 296 Fixed kwargs for model (e.g., {'random_state': 42, 'verbose': -1}) 297 evaluation_fn : callable, optional 298 Function to evaluate model: evaluation_fn(y_true, y_pred) -> score 299 Default: balanced_accuracy_score for classification 300 n_folds : int 301 Number of CV folds for training score estimation 302 303 Returns: 304 -------- 305 results : DataFrame 306 Results with columns: hyperparams, cv_train, test_score, gap_abs, etc. 307 """ 308 from sklearn.model_selection import StratifiedKFold, KFold 309 from sklearn.metrics import balanced_accuracy_score, r2_score 310 311 if self.sobol_samples is None: 312 raise ValueError("Must generate Sobol samples first") 313 314 if model_kwargs is None: 315 model_kwargs = {} 316 317 if evaluation_fn is None: 318 # Heuristic: check if classification or regression 319 if len(np.unique(y_train)) < 20: 320 evaluation_fn = balanced_accuracy_score 321 kfold = StratifiedKFold( 322 n_splits=n_folds, 323 shuffle=True, 324 random_state=self.random_state, 325 ) 326 else: 327 evaluation_fn = r2_score 328 kfold = KFold( 329 n_splits=n_folds, 330 shuffle=True, 331 random_state=self.random_state, 332 ) 333 else: 334 kfold = KFold( 335 n_splits=n_folds, shuffle=True, random_state=self.random_state 336 ) 337 338 results = [] 339 n_samples = len(self.sobol_samples) 340 341 print( 342 f"Evaluating {n_samples} configurations ({n_folds}-fold CV + test)..." 343 ) 344 345 for idx, norm_config in tqdm(enumerate(self.sobol_samples)): 346 if (idx + 1) % 50 == 0: 347 print( 348 f" Progress: {idx + 1}/{n_samples} " 349 f"({100*(idx+1)/n_samples:.1f}%)" 350 ) 351 352 config = self._denormalize_config(norm_config) 353 354 # Cross-validation score 355 cv_scores = [] 356 for train_idx, val_idx in kfold.split(X_train, y_train): 357 if hasattr(X_train, "iloc"): # DataFrame 358 X_tr, X_val = X_train.iloc[train_idx], X_train.iloc[val_idx] 359 y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx] 360 else: # ndarray 361 X_tr, X_val = X_train[train_idx], X_train[val_idx] 362 y_tr, y_val = y_train[train_idx], y_train[val_idx] 363 364 model = model_class(**config, **model_kwargs) 365 model.fit(X_tr, y_tr) 366 y_pred = model.predict(X_val) 367 cv_scores.append(evaluation_fn(y_val, y_pred)) 368 369 cv_train = np.mean(cv_scores) 370 371 # Test score 372 model = model_class(**config, **model_kwargs) 373 model.fit(X_train, y_train) 374 y_pred_test = model.predict(X_test) 375 test_score = evaluation_fn(y_test, y_pred_test) 376 377 # Generalization gaps 378 gap_abs = cv_train - test_score 379 gap_norm = ( 380 (cv_train - test_score) / (1 - cv_train) if cv_train < 1 else 0 381 ) 382 gap_mpe = ( 383 ((cv_train - test_score) / cv_train * 100) 384 if cv_train > 0 385 else 0 386 ) 387 388 results.append( 389 { 390 **config, 391 "cv_train": cv_train, 392 "test_score": test_score, 393 "gap_abs": gap_abs, 394 "gap_norm": gap_norm, 395 "gap_mpe": gap_mpe, 396 } 397 ) 398 399 self.evaluation_results = pd.DataFrame(results) 400 401 print(f"\n✓ Completed {n_samples} evaluations") 402 print( 403 f" CV Train Score: {self.evaluation_results['cv_train'].mean():.4f} \u00b1 " 404 f"{self.evaluation_results['cv_train'].std():.4f}" 405 ) 406 print( 407 f" Test Score: {self.evaluation_results['test_score'].mean():.4f} \u00b1 " 408 f"{self.evaluation_results['test_score'].std():.4f}" 409 ) 410 print( 411 f" Absolute Gap: {self.evaluation_results['gap_abs'].mean():.4f} \u00b1 " 412 f"{self.evaluation_results['gap_abs'].std():.4f}" 413 ) 414 415 return self.evaluation_results 416 417 def fit_surrogate(self, surrogate, target_gap="gap_abs"): 418 """ 419 Fit surrogate model to generalization gap data 420 421 Parameters: 422 ----------- 423 surrogate : GenericSurrogate 424 Surrogate model instance 425 target_gap : str 426 Target variable to model ('gap_abs', 'gap_norm', 'gap_mpe') 427 428 Returns: 429 -------- 430 surrogate : GenericSurrogate 431 Fitted surrogate model 432 metrics : dict 433 Performance metrics (R², MAE, coverage if applicable) 434 """ 435 if self.evaluation_results is None: 436 raise ValueError("Must evaluate configurations first") 437 438 X = self.sobol_samples 439 y = self.evaluation_results[target_gap].values 440 441 # Train/validation split 442 split_idx = int(0.8 * len(X)) 443 X_train, X_val = X[:split_idx], X[split_idx:] 444 y_train, y_val = y[:split_idx], y[split_idx:] 445 446 print( 447 f"Training surrogate on {len(X_train)} samples, " 448 f"validating on {len(X_val)} samples..." 449 ) 450 451 # Fit surrogate 452 surrogate.fit(X_train, y_train) 453 454 # Evaluate 455 y_pred, y_lower, y_upper = surrogate.predict(X_val, return_std=True) 456 457 from sklearn.metrics import r2_score, mean_absolute_error 458 459 r2 = r2_score(y_val, y_pred) 460 mae = mean_absolute_error(y_val, y_pred) 461 462 metrics = {"r2": r2, "mae": mae} 463 464 # Compute coverage if intervals available 465 if not np.array_equal(y_lower, y_upper): 466 coverage = np.mean((y_val >= y_lower) & (y_val <= y_upper)) 467 interval_width = np.mean(y_upper - y_lower) 468 metrics["coverage"] = coverage 469 metrics["interval_width"] = interval_width 470 471 print(f"\n✓ Surrogate Performance:") 472 print(f" R² Score: {r2:.4f}") 473 print(f" MAE: {mae:.6f}") 474 print(f" Coverage: {coverage:.2%}") 475 print(f" Mean Interval Width: {interval_width:.6f}") 476 else: 477 print(f"\n✓ Surrogate Performance:") 478 print(f" R² Score: {r2:.4f}") 479 print(f" MAE: {mae:.6f}") 480 481 self.surrogate = surrogate 482 return surrogate, metrics 483 484 def compute_sobol_indices(self, n_samples=10000): 485 """ 486 Compute Sobol sensitivity indices for global sensitivity analysis 487 488 Parameters: 489 ----------- 490 n_samples : int 491 Number of Monte Carlo samples for Sobol estimation 492 493 Returns: 494 -------- 495 indices : DataFrame 496 Sobol indices with columns: Parameter, S1, ST, Interaction 497 """ 498 if self.surrogate is None: 499 raise ValueError("Must fit surrogate first") 500 501 problem = { 502 "num_vars": self.n_dims, 503 "names": self.hyperparam_names, 504 "bounds": [[0, 1]] * self.n_dims, 505 } 506 507 param_values = sobol_sample.sample( 508 problem, n_samples, calc_second_order=False 509 ) 510 Y = self.surrogate.predict(param_values) 511 512 Si = sobol_analyze.analyze( 513 problem, Y, calc_second_order=False, print_to_console=False 514 ) 515 516 self.sobol_indices = pd.DataFrame( 517 { 518 "Parameter": self.hyperparam_names, 519 "S1": Si["S1"], 520 "ST": Si["ST"], 521 "Interaction": Si["ST"] - Si["S1"], 522 } 523 ).sort_values("S1", ascending=False) 524 525 print("\n" + "=" * 70) 526 print("Sobol Sensitivity Indices (Variance Decomposition)") 527 print("=" * 70) 528 print(self.sobol_indices.to_string(index=False)) 529 print("\nS1 = First-order effect (main effect)") 530 print("ST = Total-order effect (main + interactions)") 531 print("Interaction = ST - S1 (pure interaction effects)") 532 533 return self.sobol_indices 534 535 def optimize_acquisition( 536 self, 537 acquisition="ei", 538 n_starts=10, 539 kappa=2.0, 540 xi=0.01, 541 minimize_gap=True, 542 ): 543 """ 544 Optimize acquisition function to find best hyperparameter configuration 545 546 Parameters: 547 ----------- 548 acquisition : str 549 Acquisition function: 'ei' (Expected Improvement), 550 'ucb' (Upper Confidence Bound), 'pi' (Probability of Improvement) 551 n_starts : int 552 Number of random restarts for multi-start optimization 553 kappa : float 554 Exploration parameter for UCB (higher = more exploration) 555 xi : float 556 Exploration parameter for EI/PI (higher = more exploration) 557 minimize_gap : bool 558 If True, minimize gap; if False, maximize test performance 559 560 Returns: 561 -------- 562 results : dict 563 Optimization results including best config and expected improvement 564 """ 565 if self.surrogate is None: 566 raise ValueError("Must fit surrogate first") 567 568 # Get current best 569 if minimize_gap: 570 y_best = self.evaluation_results["gap_abs"].min() 571 print(f"Current best gap: {y_best:.6f}") 572 else: 573 y_best = self.evaluation_results["test_score"].max() 574 print(f"Current best test score: {y_best:.6f}") 575 576 # Define acquisition functions 577 def expected_improvement(x): 578 x = x.reshape(1, -1) 579 y_pred, y_lower, y_upper = self.surrogate.predict( 580 x, return_std=True 581 ) 582 mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4 # Approx std 583 584 if sigma < 1e-10: 585 return 0.0 586 587 if minimize_gap: 588 imp = y_best - mu - xi 589 else: 590 imp = mu - y_best - xi 591 592 Z = imp / sigma 593 ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z) 594 return -ei 595 596 def upper_confidence_bound(x): 597 x = x.reshape(1, -1) 598 y_pred, y_lower, y_upper = self.surrogate.predict( 599 x, return_std=True 600 ) 601 mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4 602 603 if minimize_gap: 604 ucb = mu - kappa * sigma 605 else: 606 ucb = mu + kappa * sigma 607 608 return ucb if minimize_gap else -ucb 609 610 def probability_of_improvement(x): 611 x = x.reshape(1, -1) 612 y_pred, y_lower, y_upper = self.surrogate.predict( 613 x, return_std=True 614 ) 615 mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4 616 617 if sigma < 1e-10: 618 return 0.0 619 620 if minimize_gap: 621 Z = (y_best - mu - xi) / sigma 622 else: 623 Z = (mu - y_best - xi) / sigma 624 625 pi = norm.cdf(Z) 626 return -pi 627 628 acq_funcs = { 629 "ei": expected_improvement, 630 "ucb": upper_confidence_bound, 631 "pi": probability_of_improvement, 632 } 633 acq_func = acq_funcs[acquisition] 634 635 # Multi-start optimization 636 print(f"\nRunning {n_starts}-start L-BFGS-B optimization...") 637 638 best_result = None 639 best_acq_value = np.inf 640 641 np.random.seed(self.random_state) 642 starting_points = np.random.rand(n_starts, self.n_dims) 643 644 # Add best observed point as starting point 645 if len(self.sobol_samples) > 0: 646 best_idx = ( 647 self.evaluation_results["gap_abs"].idxmin() 648 if minimize_gap 649 else self.evaluation_results["test_score"].idxmax() 650 ) 651 starting_points[0] = self.sobol_samples[best_idx] 652 653 for i, x0 in enumerate(starting_points): 654 try: 655 result = scipy_minimize( 656 acq_func, 657 x0, 658 method="L-BFGS-B", 659 bounds=[(0, 1)] * self.n_dims, 660 options={"maxiter": 1000, "ftol": 1e-9}, 661 ) 662 663 if result.success and result.fun < best_acq_value: 664 best_acq_value = result.fun 665 best_result = result 666 except Exception as e: 667 print(f" Warning: Optimization {i+1} failed: {str(e)}") 668 669 if best_result is None: 670 raise RuntimeError("All optimization attempts failed") 671 672 # Extract best configuration 673 best_x_normalized = best_result.x 674 best_config = self._denormalize_config(best_x_normalized) 675 676 # Predict gap at optimum 677 y_pred, y_lower, y_upper = self.surrogate.predict( 678 best_x_normalized.reshape(1, -1), return_std=True 679 ) 680 mu_best = y_pred[0] 681 682 print(f"\n✓ Optimization Complete") 683 print(f" Acquisition: {acquisition.upper()}") 684 print(f" Best acquisition value: {-best_acq_value:.6f}") 685 print(f" Predicted gap at optimum: {mu_best:.6f}") 686 print(f" 95% CI: [{y_lower[0]:.6f}, {y_upper[0]:.6f}]") 687 print(f"\n Recommended Configuration:") 688 for param, value in best_config.items(): 689 print( 690 f" {param}: {value if isinstance(value, int) else f'{value:.4f}'}" 691 ) 692 693 improvement = y_best - mu_best if minimize_gap else mu_best - y_best 694 print(f"\n Expected improvement: {improvement:.6f}") 695 696 return { 697 "best_config": best_config, 698 "best_config_normalized": best_x_normalized, 699 "predicted_gap_mean": mu_best, 700 "predicted_gap_lower": y_lower[0], 701 "predicted_gap_upper": y_upper[0], 702 "current_best": y_best, 703 "expected_improvement": improvement, 704 "acquisition_function": acquisition, 705 } 706 707 def run_diagnostics( 708 self, 709 X_train, 710 X_test, 711 y_train, 712 y_test, 713 model_class, 714 surrogate, 715 n_samples=None, 716 model_kwargs=None, 717 target_gap="gap_abs", 718 ): 719 """ 720 Run complete diagnostic pipeline 721 722 Parameters: 723 ----------- 724 X_train, X_test, y_train, y_test : arrays 725 Train/test data 726 model_class : class 727 Model class to tune 728 surrogate : GenericSurrogate 729 Surrogate model for gap modeling 730 n_samples : int, optional 731 Number of Sobol samples 732 model_kwargs : dict, optional 733 Fixed model kwargs 734 target_gap : str 735 Gap metric to model 736 737 Returns: 738 -------- 739 results : dict 740 Complete diagnostic results 741 """ 742 # Phase 1: Exploration 743 self.generate_sobol_samples(n_samples) 744 eval_results = self.evaluate_configurations( 745 X_train, X_test, y_train, y_test, model_class, model_kwargs 746 ) 747 748 # Phase 2: Surrogate fitting 749 surrogate, metrics = self.fit_surrogate(surrogate, target_gap) 750 751 # Phase 3: Analysis 752 sobol_indices = self.compute_sobol_indices() 753 754 # Phase 4: Optimization 755 opt_results = self.optimize_acquisition() 756 757 return { 758 "evaluation_results": eval_results, 759 "surrogate_metrics": metrics, 760 "sobol_indices": sobol_indices, 761 "optimization": opt_results, 762 }
Generic framework for model generalization diagnostics
This class is agnostic to:
- The base model being tuned (LightGBM, XGBoost, neural nets, etc.)
- The surrogate model used (GP, RF, neural nets, etc.)
- The hyperparameter space structure
Example usage:
>>> from sklearn.gaussian_process import GaussianProcessRegressor
>>> from sklearn.ensemble import RandomForestRegressor
>>>
>>> # Define hyperparameter space
>>> hyperparams = {
>>> 'learning_rate': (0.01, 0.3, True), # (min, max, log_scale)
>>> 'max_depth': (3, 10, False)
>>> }
>>>
>>> # Initialize framework
>>> diag = GeneralizationOpt(hyperparams, random_state=42)
>>>
>>> # Use GP surrogate with Bayesian uncertainty
>>> gp = GaussianProcessRegressor()
>>> surrogate = GenericSurrogate(gp, conformal=False, bayesian=True)
>>>
>>> # Or use RF with conformal prediction
>>> rf = RandomForestRegressor()
>>> surrogate = GenericSurrogate(rf, conformal=True, bayesian=False)
>>>
>>> # Run diagnostics
>>> results = diag.run_diagnostics(
>>> X_train, X_test, y_train, y_test,
>>> model_class=LGBMClassifier,
>>> surrogate=surrogate,
>>> n_samples=100
>>> )