GPopt.GeneralizationOpt

  1import numpy as np
  2import pandas as pd
  3from tqdm import tqdm
  4from scipy.stats.qmc import Sobol
  5from scipy.optimize import minimize as scipy_minimize
  6from scipy.stats import norm
  7from sklearn.model_selection import train_test_split
  8from sklearn.base import BaseEstimator, RegressorMixin
  9from SALib.analyze import sobol as sobol_analyze
 10from SALib.sample import sobol as sobol_sample
 11import warnings
 12
 13warnings.filterwarnings("ignore")
 14
 15
 16class GenericSurrogate(BaseEstimator, RegressorMixin):
 17    """
 18    Generic wrapper for any surrogate model with conformal prediction support
 19
 20    This class wraps any sklearn-compatible model and adds:
 21    - Conformal prediction intervals (distribution-free uncertainty)
 22    - Optional Bayesian uncertainty (if model supports it)
 23    - Unified prediction interface
 24
 25    Parameters:
 26    -----------
 27    base_model : object
 28        Any sklearn-compatible model with fit() and predict() methods
 29    conformal : bool
 30        Whether to use conformal prediction for uncertainty
 31    bayesian : bool
 32        Whether model provides native uncertainty (e.g., GP, Bayesian NN)
 33    """
 34
 35    def __init__(self, base_model, conformal=True, bayesian=False):
 36        self.base_model = base_model
 37        self.conformal = conformal
 38        self.bayesian = bayesian
 39        self.nonconformity_scores_ = None
 40        self.is_fitted_ = False
 41
 42    def fit(self, X, y, cal_ratio=0.2):
 43        """
 44        Fit surrogate with optional conformal calibration
 45
 46        Parameters:
 47        -----------
 48        X : array-like, shape (n_samples, n_features)
 49            Training features
 50        y : array-like, shape (n_samples,)
 51            Training targets
 52        cal_ratio : float
 53            Proportion of data for conformal calibration (if conformal=True)
 54
 55        Returns:
 56        --------
 57        self : object
 58        """
 59        if self.conformal and cal_ratio > 0:
 60            # Split conformal prediction: reserve data for calibration
 61            n_cal = int(len(X) * cal_ratio)
 62            X_train, X_cal = X[:-n_cal], X[-n_cal:]
 63            y_train, y_cal = y[:-n_cal], y[-n_cal:]
 64
 65            self.base_model.fit(X_train, y_train)
 66
 67            # Compute nonconformity scores on calibration set
 68            y_cal_pred = self.base_model.predict(X_cal)
 69            self.nonconformity_scores_ = np.abs(y_cal - y_cal_pred)
 70        else:
 71            # No conformal prediction: use all data
 72            self.base_model.fit(X, y)
 73
 74        self.is_fitted_ = True
 75        return self
 76
 77    def predict(self, X, return_std=False, confidence=0.95):
 78        """
 79        Unified prediction interface with optional uncertainty
 80
 81        Parameters:
 82        -----------
 83        X : array-like, shape (n_samples, n_features)
 84            Test features
 85        return_std : bool
 86            Whether to return uncertainty estimates
 87        confidence : float
 88            Confidence level for prediction intervals (if conformal=True)
 89
 90        Returns:
 91        --------
 92        predictions : array or tuple
 93            If return_std=False: predictions only
 94            If return_std=True: (predictions, lower_bound, upper_bound)
 95
 96        Notes:
 97        ------
 98        - Conformal: Distribution-free intervals with finite-sample guarantees
 99        - Bayesian: Model-based uncertainty (e.g., GP posterior std)
100        """
101        if not self.is_fitted_:
102            raise ValueError("Model must be fitted before prediction")
103
104        # Get point predictions
105        y_pred = self.base_model.predict(X)
106
107        if not return_std:
108            return y_pred
109
110        # Compute uncertainty estimates
111        if self.conformal and self.nonconformity_scores_ is not None:
112            # Conformal prediction intervals
113            alpha = 1 - confidence
114            n_scores = len(self.nonconformity_scores_)
115
116            # Calculate the effective quantile level based on (n+1) rule
117            # Cap the quantile level at 1.0 to prevent ValueError from np.quantile
118            quantile_level = min(
119                1.0, np.ceil((1 - alpha) * (n_scores + 1)) / n_scores
120            )
121
122            q = np.quantile(self.nonconformity_scores_, quantile_level)
123            y_lower = y_pred - q
124            y_upper = y_pred + q
125
126        elif self.bayesian and hasattr(self.base_model, "predict"):
127            # Try to get Bayesian uncertainty from model
128            try:
129                # For GP-like models
130                if (
131                    hasattr(self.base_model, "predict")
132                    and "return_std"
133                    in self.base_model.predict.__code__.co_varnames
134                ):
135                    _, y_std = self.base_model.predict(X, return_std=True)
136                    y_lower = y_pred - 2 * y_std  # 95% CI for Gaussian
137                    y_upper = y_pred + 2 * y_std
138                else:
139                    # Fallback: assume constant uncertainty
140                    y_std = np.std(y_pred) * np.ones_like(y_pred)
141                    y_lower = y_pred - 2 * y_std
142                    y_upper = y_pred + 2 * y_std
143            except:
144                # No uncertainty available
145                y_lower = y_pred
146                y_upper = y_pred
147        else:
148            # No uncertainty quantification available
149            y_lower = y_pred
150            y_upper = y_pred
151
152        return y_pred, y_lower, y_upper
153
154
155class GeneralizationOpt:
156    """
157    Generic framework for model generalization diagnostics
158
159    This class is agnostic to:
160    - The base model being tuned (LightGBM, XGBoost, neural nets, etc.)
161    - The surrogate model used (GP, RF, neural nets, etc.)
162    - The hyperparameter space structure
163
164    Example usage:
165    --------------
166    >>> from sklearn.gaussian_process import GaussianProcessRegressor
167    >>> from sklearn.ensemble import RandomForestRegressor
168    >>>
169    >>> # Define hyperparameter space
170    >>> hyperparams = {
171    >>>     'learning_rate': (0.01, 0.3, True),  # (min, max, log_scale)
172    >>>     'max_depth': (3, 10, False)
173    >>> }
174    >>>
175    >>> # Initialize framework
176    >>> diag = GeneralizationOpt(hyperparams, random_state=42)
177    >>>
178    >>> # Use GP surrogate with Bayesian uncertainty
179    >>> gp = GaussianProcessRegressor()
180    >>> surrogate = GenericSurrogate(gp, conformal=False, bayesian=True)
181    >>>
182    >>> # Or use RF with conformal prediction
183    >>> rf = RandomForestRegressor()
184    >>> surrogate = GenericSurrogate(rf, conformal=True, bayesian=False)
185    >>>
186    >>> # Run diagnostics
187    >>> results = diag.run_diagnostics(
188    >>>     X_train, X_test, y_train, y_test,
189    >>>     model_class=LGBMClassifier,
190    >>>     surrogate=surrogate,
191    >>>     n_samples=100
192    >>> )
193    """
194
195    def __init__(self, hyperparams, random_state=42):
196        """
197        Initialize diagnostics framework
198
199        Parameters:
200        -----------
201        hyperparams : dict
202            Hyperparameter space definition
203            Format: {param_name: (min_val, max_val, log_scale), ...}
204        random_state : int
205            Random seed for reproducibility
206        """
207        self.hyperparams = hyperparams
208        self.hyperparam_names = list(hyperparams.keys())
209        self.n_dims = len(self.hyperparam_names)
210        self.random_state = random_state
211
212        # Storage
213        self.sobol_samples = None
214        self.evaluation_results = None
215        self.surrogate = None
216        self.sobol_indices = None
217
218    def _denormalize_config(self, normalized_config):
219        """Convert normalized [0,1] to actual hyperparameter values"""
220        config = {}
221        for i, param_name in enumerate(self.hyperparam_names):
222            min_val, max_val, log_scale = self.hyperparams[param_name]
223            norm_val = normalized_config[i]
224
225            if log_scale:
226                log_min, log_max = np.log(min_val), np.log(max_val)
227                config[param_name] = np.exp(
228                    log_min + norm_val * (log_max - log_min)
229                )
230            else:
231                denormalized_value = min_val + norm_val * (max_val - min_val)
232                # Special handling for boolean parameters represented as (0, 1, False)
233                if (
234                    min_val == 0
235                    and max_val == 1
236                    and not log_scale
237                    and isinstance(min_val, int)
238                ):
239                    config[param_name] = bool(round(denormalized_value))
240                # Convert to int if needed (if min and max are integers and not log-scale)
241                elif (
242                    isinstance(min_val, int)
243                    and isinstance(max_val, int)
244                    and not log_scale
245                ):
246                    config[param_name] = int(round(denormalized_value))
247                else:
248                    config[param_name] = denormalized_value
249
250        return config
251
252    def generate_sobol_samples(self, n_samples=None):
253        """
254        Generate Sobol quasi-random samples in hyperparameter space
255
256        Parameters:
257        -----------
258        n_samples : int, optional
259            Number of samples (default: 75 * n_dims)
260
261        Returns:
262        --------
263        samples : ndarray, shape (n_samples, n_dims)
264            Normalized samples in [0, 1]^n_dims
265        """
266        if n_samples is None:
267            n_samples = 75 * self.n_dims
268
269        sobol = Sobol(d=self.n_dims, scramble=True, seed=self.random_state)
270        self.sobol_samples = sobol.random(n_samples)
271
272        return self.sobol_samples
273
274    def evaluate_configurations(
275        self,
276        X_train,
277        X_test,
278        y_train,
279        y_test,
280        model_class,
281        model_kwargs=None,
282        evaluation_fn=None,
283        n_folds=5,
284    ):
285        """
286        Evaluate model configurations and compute generalization gaps
287
288        Parameters:
289        -----------
290        X_train, X_test, y_train, y_test : arrays
291            Train/test data
292        model_class : class
293            Model class to instantiate (e.g., LGBMClassifier)
294        model_kwargs : dict, optional
295            Fixed kwargs for model (e.g., {'random_state': 42, 'verbose': -1})
296        evaluation_fn : callable, optional
297            Function to evaluate model: evaluation_fn(y_true, y_pred) -> score
298            Default: balanced_accuracy_score for classification
299        n_folds : int
300            Number of CV folds for training score estimation
301
302        Returns:
303        --------
304        results : DataFrame
305            Results with columns: hyperparams, cv_train, test_score, gap_abs, etc.
306        """
307        from sklearn.model_selection import StratifiedKFold, KFold
308        from sklearn.metrics import balanced_accuracy_score, r2_score
309
310        if self.sobol_samples is None:
311            raise ValueError("Must generate Sobol samples first")
312
313        if model_kwargs is None:
314            model_kwargs = {}
315
316        if evaluation_fn is None:
317            # Heuristic: check if classification or regression
318            if len(np.unique(y_train)) < 20:
319                evaluation_fn = balanced_accuracy_score
320                kfold = StratifiedKFold(
321                    n_splits=n_folds,
322                    shuffle=True,
323                    random_state=self.random_state,
324                )
325            else:
326                evaluation_fn = r2_score
327                kfold = KFold(
328                    n_splits=n_folds,
329                    shuffle=True,
330                    random_state=self.random_state,
331                )
332        else:
333            kfold = KFold(
334                n_splits=n_folds, shuffle=True, random_state=self.random_state
335            )
336
337        results = []
338        n_samples = len(self.sobol_samples)
339
340        print(
341            f"Evaluating {n_samples} configurations ({n_folds}-fold CV + test)..."
342        )
343
344        for idx, norm_config in tqdm(enumerate(self.sobol_samples)):
345            if (idx + 1) % 50 == 0:
346                print(
347                    f"  Progress: {idx + 1}/{n_samples} "
348                    f"({100*(idx+1)/n_samples:.1f}%)"
349                )
350
351            config = self._denormalize_config(norm_config)
352
353            # Cross-validation score
354            cv_scores = []
355            for train_idx, val_idx in kfold.split(X_train, y_train):
356                if hasattr(X_train, "iloc"):  # DataFrame
357                    X_tr, X_val = X_train.iloc[train_idx], X_train.iloc[val_idx]
358                    y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
359                else:  # ndarray
360                    X_tr, X_val = X_train[train_idx], X_train[val_idx]
361                    y_tr, y_val = y_train[train_idx], y_train[val_idx]
362
363                model = model_class(**config, **model_kwargs)
364                model.fit(X_tr, y_tr)
365                y_pred = model.predict(X_val)
366                cv_scores.append(evaluation_fn(y_val, y_pred))
367
368            cv_train = np.mean(cv_scores)
369
370            # Test score
371            model = model_class(**config, **model_kwargs)
372            model.fit(X_train, y_train)
373            y_pred_test = model.predict(X_test)
374            test_score = evaluation_fn(y_test, y_pred_test)
375
376            # Generalization gaps
377            gap_abs = cv_train - test_score
378            gap_norm = (
379                (cv_train - test_score) / (1 - cv_train) if cv_train < 1 else 0
380            )
381            gap_mpe = (
382                ((cv_train - test_score) / cv_train * 100)
383                if cv_train > 0
384                else 0
385            )
386
387            results.append(
388                {
389                    **config,
390                    "cv_train": cv_train,
391                    "test_score": test_score,
392                    "gap_abs": gap_abs,
393                    "gap_norm": gap_norm,
394                    "gap_mpe": gap_mpe,
395                }
396            )
397
398        self.evaluation_results = pd.DataFrame(results)
399
400        print(f"\n✓ Completed {n_samples} evaluations")
401        print(
402            f"  CV Train Score: {self.evaluation_results['cv_train'].mean():.4f} \u00b1 "
403            f"{self.evaluation_results['cv_train'].std():.4f}"
404        )
405        print(
406            f"  Test Score: {self.evaluation_results['test_score'].mean():.4f} \u00b1 "
407            f"{self.evaluation_results['test_score'].std():.4f}"
408        )
409        print(
410            f"  Absolute Gap: {self.evaluation_results['gap_abs'].mean():.4f} \u00b1 "
411            f"{self.evaluation_results['gap_abs'].std():.4f}"
412        )
413
414        return self.evaluation_results
415
416    def fit_surrogate(self, surrogate, target_gap="gap_abs"):
417        """
418        Fit surrogate model to generalization gap data
419
420        Parameters:
421        -----------
422        surrogate : GenericSurrogate
423            Surrogate model instance
424        target_gap : str
425            Target variable to model ('gap_abs', 'gap_norm', 'gap_mpe')
426
427        Returns:
428        --------
429        surrogate : GenericSurrogate
430            Fitted surrogate model
431        metrics : dict
432            Performance metrics (R², MAE, coverage if applicable)
433        """
434        if self.evaluation_results is None:
435            raise ValueError("Must evaluate configurations first")
436
437        X = self.sobol_samples
438        y = self.evaluation_results[target_gap].values
439
440        # Train/validation split
441        split_idx = int(0.8 * len(X))
442        X_train, X_val = X[:split_idx], X[split_idx:]
443        y_train, y_val = y[:split_idx], y[split_idx:]
444
445        print(
446            f"Training surrogate on {len(X_train)} samples, "
447            f"validating on {len(X_val)} samples..."
448        )
449
450        # Fit surrogate
451        surrogate.fit(X_train, y_train)
452
453        # Evaluate
454        y_pred, y_lower, y_upper = surrogate.predict(X_val, return_std=True)
455
456        from sklearn.metrics import r2_score, mean_absolute_error
457
458        r2 = r2_score(y_val, y_pred)
459        mae = mean_absolute_error(y_val, y_pred)
460
461        metrics = {"r2": r2, "mae": mae}
462
463        # Compute coverage if intervals available
464        if not np.array_equal(y_lower, y_upper):
465            coverage = np.mean((y_val >= y_lower) & (y_val <= y_upper))
466            interval_width = np.mean(y_upper - y_lower)
467            metrics["coverage"] = coverage
468            metrics["interval_width"] = interval_width
469
470            print(f"\n✓ Surrogate Performance:")
471            print(f"  R² Score: {r2:.4f}")
472            print(f"  MAE: {mae:.6f}")
473            print(f"  Coverage: {coverage:.2%}")
474            print(f"  Mean Interval Width: {interval_width:.6f}")
475        else:
476            print(f"\n✓ Surrogate Performance:")
477            print(f"  R² Score: {r2:.4f}")
478            print(f"  MAE: {mae:.6f}")
479
480        self.surrogate = surrogate
481        return surrogate, metrics
482
483    def compute_sobol_indices(self, n_samples=10000):
484        """
485        Compute Sobol sensitivity indices for global sensitivity analysis
486
487        Parameters:
488        -----------
489        n_samples : int
490            Number of Monte Carlo samples for Sobol estimation
491
492        Returns:
493        --------
494        indices : DataFrame
495            Sobol indices with columns: Parameter, S1, ST, Interaction
496        """
497        if self.surrogate is None:
498            raise ValueError("Must fit surrogate first")
499
500        problem = {
501            "num_vars": self.n_dims,
502            "names": self.hyperparam_names,
503            "bounds": [[0, 1]] * self.n_dims,
504        }
505
506        param_values = sobol_sample.sample(
507            problem, n_samples, calc_second_order=False
508        )
509        Y = self.surrogate.predict(param_values)
510
511        Si = sobol_analyze.analyze(
512            problem, Y, calc_second_order=False, print_to_console=False
513        )
514
515        self.sobol_indices = pd.DataFrame(
516            {
517                "Parameter": self.hyperparam_names,
518                "S1": Si["S1"],
519                "ST": Si["ST"],
520                "Interaction": Si["ST"] - Si["S1"],
521            }
522        ).sort_values("S1", ascending=False)
523
524        print("\n" + "=" * 70)
525        print("Sobol Sensitivity Indices (Variance Decomposition)")
526        print("=" * 70)
527        print(self.sobol_indices.to_string(index=False))
528        print("\nS1 = First-order effect (main effect)")
529        print("ST = Total-order effect (main + interactions)")
530        print("Interaction = ST - S1 (pure interaction effects)")
531
532        return self.sobol_indices
533
534    def optimize_acquisition(
535        self,
536        acquisition="ei",
537        n_starts=10,
538        kappa=2.0,
539        xi=0.01,
540        minimize_gap=True,
541    ):
542        """
543        Optimize acquisition function to find best hyperparameter configuration
544
545        Parameters:
546        -----------
547        acquisition : str
548            Acquisition function: 'ei' (Expected Improvement),
549            'ucb' (Upper Confidence Bound), 'pi' (Probability of Improvement)
550        n_starts : int
551            Number of random restarts for multi-start optimization
552        kappa : float
553            Exploration parameter for UCB (higher = more exploration)
554        xi : float
555            Exploration parameter for EI/PI (higher = more exploration)
556        minimize_gap : bool
557            If True, minimize gap; if False, maximize test performance
558
559        Returns:
560        --------
561        results : dict
562            Optimization results including best config and expected improvement
563        """
564        if self.surrogate is None:
565            raise ValueError("Must fit surrogate first")
566
567        # Get current best
568        if minimize_gap:
569            y_best = self.evaluation_results["gap_abs"].min()
570            print(f"Current best gap: {y_best:.6f}")
571        else:
572            y_best = self.evaluation_results["test_score"].max()
573            print(f"Current best test score: {y_best:.6f}")
574
575        # Define acquisition functions
576        def expected_improvement(x):
577            x = x.reshape(1, -1)
578            y_pred, y_lower, y_upper = self.surrogate.predict(
579                x, return_std=True
580            )
581            mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4  # Approx std
582
583            if sigma < 1e-10:
584                return 0.0
585
586            if minimize_gap:
587                imp = y_best - mu - xi
588            else:
589                imp = mu - y_best - xi
590
591            Z = imp / sigma
592            ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
593            return -ei
594
595        def upper_confidence_bound(x):
596            x = x.reshape(1, -1)
597            y_pred, y_lower, y_upper = self.surrogate.predict(
598                x, return_std=True
599            )
600            mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4
601
602            if minimize_gap:
603                ucb = mu - kappa * sigma
604            else:
605                ucb = mu + kappa * sigma
606
607            return ucb if minimize_gap else -ucb
608
609        def probability_of_improvement(x):
610            x = x.reshape(1, -1)
611            y_pred, y_lower, y_upper = self.surrogate.predict(
612                x, return_std=True
613            )
614            mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4
615
616            if sigma < 1e-10:
617                return 0.0
618
619            if minimize_gap:
620                Z = (y_best - mu - xi) / sigma
621            else:
622                Z = (mu - y_best - xi) / sigma
623
624            pi = norm.cdf(Z)
625            return -pi
626
627        acq_funcs = {
628            "ei": expected_improvement,
629            "ucb": upper_confidence_bound,
630            "pi": probability_of_improvement,
631        }
632        acq_func = acq_funcs[acquisition]
633
634        # Multi-start optimization
635        print(f"\nRunning {n_starts}-start L-BFGS-B optimization...")
636
637        best_result = None
638        best_acq_value = np.inf
639
640        np.random.seed(self.random_state)
641        starting_points = np.random.rand(n_starts, self.n_dims)
642
643        # Add best observed point as starting point
644        if len(self.sobol_samples) > 0:
645            best_idx = (
646                self.evaluation_results["gap_abs"].idxmin()
647                if minimize_gap
648                else self.evaluation_results["test_score"].idxmax()
649            )
650            starting_points[0] = self.sobol_samples[best_idx]
651
652        for i, x0 in enumerate(starting_points):
653            try:
654                result = scipy_minimize(
655                    acq_func,
656                    x0,
657                    method="L-BFGS-B",
658                    bounds=[(0, 1)] * self.n_dims,
659                    options={"maxiter": 1000, "ftol": 1e-9},
660                )
661
662                if result.success and result.fun < best_acq_value:
663                    best_acq_value = result.fun
664                    best_result = result
665            except Exception as e:
666                print(f"  Warning: Optimization {i+1} failed: {str(e)}")
667
668        if best_result is None:
669            raise RuntimeError("All optimization attempts failed")
670
671        # Extract best configuration
672        best_x_normalized = best_result.x
673        best_config = self._denormalize_config(best_x_normalized)
674
675        # Predict gap at optimum
676        y_pred, y_lower, y_upper = self.surrogate.predict(
677            best_x_normalized.reshape(1, -1), return_std=True
678        )
679        mu_best = y_pred[0]
680
681        print(f"\n✓ Optimization Complete")
682        print(f"  Acquisition: {acquisition.upper()}")
683        print(f"  Best acquisition value: {-best_acq_value:.6f}")
684        print(f"  Predicted gap at optimum: {mu_best:.6f}")
685        print(f"  95% CI: [{y_lower[0]:.6f}, {y_upper[0]:.6f}]")
686        print(f"\n  Recommended Configuration:")
687        for param, value in best_config.items():
688            print(
689                f"    {param}: {value if isinstance(value, int) else f'{value:.4f}'}"
690            )
691
692        improvement = y_best - mu_best if minimize_gap else mu_best - y_best
693        print(f"\n  Expected improvement: {improvement:.6f}")
694
695        return {
696            "best_config": best_config,
697            "best_config_normalized": best_x_normalized,
698            "predicted_gap_mean": mu_best,
699            "predicted_gap_lower": y_lower[0],
700            "predicted_gap_upper": y_upper[0],
701            "current_best": y_best,
702            "expected_improvement": improvement,
703            "acquisition_function": acquisition,
704        }
705
706    def run_diagnostics(
707        self,
708        X_train,
709        X_test,
710        y_train,
711        y_test,
712        model_class,
713        surrogate,
714        n_samples=None,
715        model_kwargs=None,
716        target_gap="gap_abs",
717    ):
718        """
719        Run complete diagnostic pipeline
720
721        Parameters:
722        -----------
723        X_train, X_test, y_train, y_test : arrays
724            Train/test data
725        model_class : class
726            Model class to tune
727        surrogate : GenericSurrogate
728            Surrogate model for gap modeling
729        n_samples : int, optional
730            Number of Sobol samples
731        model_kwargs : dict, optional
732            Fixed model kwargs
733        target_gap : str
734            Gap metric to model
735
736        Returns:
737        --------
738        results : dict
739            Complete diagnostic results
740        """
741        # Phase 1: Exploration
742        self.generate_sobol_samples(n_samples)
743        eval_results = self.evaluate_configurations(
744            X_train, X_test, y_train, y_test, model_class, model_kwargs
745        )
746
747        # Phase 2: Surrogate fitting
748        surrogate, metrics = self.fit_surrogate(surrogate, target_gap)
749
750        # Phase 3: Analysis
751        sobol_indices = self.compute_sobol_indices()
752
753        # Phase 4: Optimization
754        opt_results = self.optimize_acquisition()
755
756        return {
757            "evaluation_results": eval_results,
758            "surrogate_metrics": metrics,
759            "sobol_indices": sobol_indices,
760            "optimization": opt_results,
761        }
class GenericSurrogate(sklearn.base.BaseEstimator, sklearn.base.RegressorMixin):
 17class GenericSurrogate(BaseEstimator, RegressorMixin):
 18    """
 19    Generic wrapper for any surrogate model with conformal prediction support
 20
 21    This class wraps any sklearn-compatible model and adds:
 22    - Conformal prediction intervals (distribution-free uncertainty)
 23    - Optional Bayesian uncertainty (if model supports it)
 24    - Unified prediction interface
 25
 26    Parameters:
 27    -----------
 28    base_model : object
 29        Any sklearn-compatible model with fit() and predict() methods
 30    conformal : bool
 31        Whether to use conformal prediction for uncertainty
 32    bayesian : bool
 33        Whether model provides native uncertainty (e.g., GP, Bayesian NN)
 34    """
 35
 36    def __init__(self, base_model, conformal=True, bayesian=False):
 37        self.base_model = base_model
 38        self.conformal = conformal
 39        self.bayesian = bayesian
 40        self.nonconformity_scores_ = None
 41        self.is_fitted_ = False
 42
 43    def fit(self, X, y, cal_ratio=0.2):
 44        """
 45        Fit surrogate with optional conformal calibration
 46
 47        Parameters:
 48        -----------
 49        X : array-like, shape (n_samples, n_features)
 50            Training features
 51        y : array-like, shape (n_samples,)
 52            Training targets
 53        cal_ratio : float
 54            Proportion of data for conformal calibration (if conformal=True)
 55
 56        Returns:
 57        --------
 58        self : object
 59        """
 60        if self.conformal and cal_ratio > 0:
 61            # Split conformal prediction: reserve data for calibration
 62            n_cal = int(len(X) * cal_ratio)
 63            X_train, X_cal = X[:-n_cal], X[-n_cal:]
 64            y_train, y_cal = y[:-n_cal], y[-n_cal:]
 65
 66            self.base_model.fit(X_train, y_train)
 67
 68            # Compute nonconformity scores on calibration set
 69            y_cal_pred = self.base_model.predict(X_cal)
 70            self.nonconformity_scores_ = np.abs(y_cal - y_cal_pred)
 71        else:
 72            # No conformal prediction: use all data
 73            self.base_model.fit(X, y)
 74
 75        self.is_fitted_ = True
 76        return self
 77
 78    def predict(self, X, return_std=False, confidence=0.95):
 79        """
 80        Unified prediction interface with optional uncertainty
 81
 82        Parameters:
 83        -----------
 84        X : array-like, shape (n_samples, n_features)
 85            Test features
 86        return_std : bool
 87            Whether to return uncertainty estimates
 88        confidence : float
 89            Confidence level for prediction intervals (if conformal=True)
 90
 91        Returns:
 92        --------
 93        predictions : array or tuple
 94            If return_std=False: predictions only
 95            If return_std=True: (predictions, lower_bound, upper_bound)
 96
 97        Notes:
 98        ------
 99        - Conformal: Distribution-free intervals with finite-sample guarantees
100        - Bayesian: Model-based uncertainty (e.g., GP posterior std)
101        """
102        if not self.is_fitted_:
103            raise ValueError("Model must be fitted before prediction")
104
105        # Get point predictions
106        y_pred = self.base_model.predict(X)
107
108        if not return_std:
109            return y_pred
110
111        # Compute uncertainty estimates
112        if self.conformal and self.nonconformity_scores_ is not None:
113            # Conformal prediction intervals
114            alpha = 1 - confidence
115            n_scores = len(self.nonconformity_scores_)
116
117            # Calculate the effective quantile level based on (n+1) rule
118            # Cap the quantile level at 1.0 to prevent ValueError from np.quantile
119            quantile_level = min(
120                1.0, np.ceil((1 - alpha) * (n_scores + 1)) / n_scores
121            )
122
123            q = np.quantile(self.nonconformity_scores_, quantile_level)
124            y_lower = y_pred - q
125            y_upper = y_pred + q
126
127        elif self.bayesian and hasattr(self.base_model, "predict"):
128            # Try to get Bayesian uncertainty from model
129            try:
130                # For GP-like models
131                if (
132                    hasattr(self.base_model, "predict")
133                    and "return_std"
134                    in self.base_model.predict.__code__.co_varnames
135                ):
136                    _, y_std = self.base_model.predict(X, return_std=True)
137                    y_lower = y_pred - 2 * y_std  # 95% CI for Gaussian
138                    y_upper = y_pred + 2 * y_std
139                else:
140                    # Fallback: assume constant uncertainty
141                    y_std = np.std(y_pred) * np.ones_like(y_pred)
142                    y_lower = y_pred - 2 * y_std
143                    y_upper = y_pred + 2 * y_std
144            except:
145                # No uncertainty available
146                y_lower = y_pred
147                y_upper = y_pred
148        else:
149            # No uncertainty quantification available
150            y_lower = y_pred
151            y_upper = y_pred
152
153        return y_pred, y_lower, y_upper

Generic wrapper for any surrogate model with conformal prediction support

This class wraps any sklearn-compatible model and adds:

  • Conformal prediction intervals (distribution-free uncertainty)
  • Optional Bayesian uncertainty (if model supports it)
  • Unified prediction interface

Parameters:

base_model : object Any sklearn-compatible model with fit() and predict() methods conformal : bool Whether to use conformal prediction for uncertainty bayesian : bool Whether model provides native uncertainty (e.g., GP, Bayesian NN)

def fit(self, X, y, cal_ratio=0.2):
43    def fit(self, X, y, cal_ratio=0.2):
44        """
45        Fit surrogate with optional conformal calibration
46
47        Parameters:
48        -----------
49        X : array-like, shape (n_samples, n_features)
50            Training features
51        y : array-like, shape (n_samples,)
52            Training targets
53        cal_ratio : float
54            Proportion of data for conformal calibration (if conformal=True)
55
56        Returns:
57        --------
58        self : object
59        """
60        if self.conformal and cal_ratio > 0:
61            # Split conformal prediction: reserve data for calibration
62            n_cal = int(len(X) * cal_ratio)
63            X_train, X_cal = X[:-n_cal], X[-n_cal:]
64            y_train, y_cal = y[:-n_cal], y[-n_cal:]
65
66            self.base_model.fit(X_train, y_train)
67
68            # Compute nonconformity scores on calibration set
69            y_cal_pred = self.base_model.predict(X_cal)
70            self.nonconformity_scores_ = np.abs(y_cal - y_cal_pred)
71        else:
72            # No conformal prediction: use all data
73            self.base_model.fit(X, y)
74
75        self.is_fitted_ = True
76        return self

Fit surrogate with optional conformal calibration

Parameters:

X : array-like, shape (n_samples, n_features) Training features y : array-like, shape (n_samples,) Training targets cal_ratio : float Proportion of data for conformal calibration (if conformal=True)

Returns:

self : object

def predict(self, X, return_std=False, confidence=0.95):
 78    def predict(self, X, return_std=False, confidence=0.95):
 79        """
 80        Unified prediction interface with optional uncertainty
 81
 82        Parameters:
 83        -----------
 84        X : array-like, shape (n_samples, n_features)
 85            Test features
 86        return_std : bool
 87            Whether to return uncertainty estimates
 88        confidence : float
 89            Confidence level for prediction intervals (if conformal=True)
 90
 91        Returns:
 92        --------
 93        predictions : array or tuple
 94            If return_std=False: predictions only
 95            If return_std=True: (predictions, lower_bound, upper_bound)
 96
 97        Notes:
 98        ------
 99        - Conformal: Distribution-free intervals with finite-sample guarantees
100        - Bayesian: Model-based uncertainty (e.g., GP posterior std)
101        """
102        if not self.is_fitted_:
103            raise ValueError("Model must be fitted before prediction")
104
105        # Get point predictions
106        y_pred = self.base_model.predict(X)
107
108        if not return_std:
109            return y_pred
110
111        # Compute uncertainty estimates
112        if self.conformal and self.nonconformity_scores_ is not None:
113            # Conformal prediction intervals
114            alpha = 1 - confidence
115            n_scores = len(self.nonconformity_scores_)
116
117            # Calculate the effective quantile level based on (n+1) rule
118            # Cap the quantile level at 1.0 to prevent ValueError from np.quantile
119            quantile_level = min(
120                1.0, np.ceil((1 - alpha) * (n_scores + 1)) / n_scores
121            )
122
123            q = np.quantile(self.nonconformity_scores_, quantile_level)
124            y_lower = y_pred - q
125            y_upper = y_pred + q
126
127        elif self.bayesian and hasattr(self.base_model, "predict"):
128            # Try to get Bayesian uncertainty from model
129            try:
130                # For GP-like models
131                if (
132                    hasattr(self.base_model, "predict")
133                    and "return_std"
134                    in self.base_model.predict.__code__.co_varnames
135                ):
136                    _, y_std = self.base_model.predict(X, return_std=True)
137                    y_lower = y_pred - 2 * y_std  # 95% CI for Gaussian
138                    y_upper = y_pred + 2 * y_std
139                else:
140                    # Fallback: assume constant uncertainty
141                    y_std = np.std(y_pred) * np.ones_like(y_pred)
142                    y_lower = y_pred - 2 * y_std
143                    y_upper = y_pred + 2 * y_std
144            except:
145                # No uncertainty available
146                y_lower = y_pred
147                y_upper = y_pred
148        else:
149            # No uncertainty quantification available
150            y_lower = y_pred
151            y_upper = y_pred
152
153        return y_pred, y_lower, y_upper

Unified prediction interface with optional uncertainty

Parameters:

X : array-like, shape (n_samples, n_features) Test features return_std : bool Whether to return uncertainty estimates confidence : float Confidence level for prediction intervals (if conformal=True)

Returns:

predictions : array or tuple If return_std=False: predictions only If return_std=True: (predictions, lower_bound, upper_bound)

Notes:

  • Conformal: Distribution-free intervals with finite-sample guarantees
  • Bayesian: Model-based uncertainty (e.g., GP posterior std)
class GeneralizationOpt:
156class GeneralizationOpt:
157    """
158    Generic framework for model generalization diagnostics
159
160    This class is agnostic to:
161    - The base model being tuned (LightGBM, XGBoost, neural nets, etc.)
162    - The surrogate model used (GP, RF, neural nets, etc.)
163    - The hyperparameter space structure
164
165    Example usage:
166    --------------
167    >>> from sklearn.gaussian_process import GaussianProcessRegressor
168    >>> from sklearn.ensemble import RandomForestRegressor
169    >>>
170    >>> # Define hyperparameter space
171    >>> hyperparams = {
172    >>>     'learning_rate': (0.01, 0.3, True),  # (min, max, log_scale)
173    >>>     'max_depth': (3, 10, False)
174    >>> }
175    >>>
176    >>> # Initialize framework
177    >>> diag = GeneralizationOpt(hyperparams, random_state=42)
178    >>>
179    >>> # Use GP surrogate with Bayesian uncertainty
180    >>> gp = GaussianProcessRegressor()
181    >>> surrogate = GenericSurrogate(gp, conformal=False, bayesian=True)
182    >>>
183    >>> # Or use RF with conformal prediction
184    >>> rf = RandomForestRegressor()
185    >>> surrogate = GenericSurrogate(rf, conformal=True, bayesian=False)
186    >>>
187    >>> # Run diagnostics
188    >>> results = diag.run_diagnostics(
189    >>>     X_train, X_test, y_train, y_test,
190    >>>     model_class=LGBMClassifier,
191    >>>     surrogate=surrogate,
192    >>>     n_samples=100
193    >>> )
194    """
195
196    def __init__(self, hyperparams, random_state=42):
197        """
198        Initialize diagnostics framework
199
200        Parameters:
201        -----------
202        hyperparams : dict
203            Hyperparameter space definition
204            Format: {param_name: (min_val, max_val, log_scale), ...}
205        random_state : int
206            Random seed for reproducibility
207        """
208        self.hyperparams = hyperparams
209        self.hyperparam_names = list(hyperparams.keys())
210        self.n_dims = len(self.hyperparam_names)
211        self.random_state = random_state
212
213        # Storage
214        self.sobol_samples = None
215        self.evaluation_results = None
216        self.surrogate = None
217        self.sobol_indices = None
218
219    def _denormalize_config(self, normalized_config):
220        """Convert normalized [0,1] to actual hyperparameter values"""
221        config = {}
222        for i, param_name in enumerate(self.hyperparam_names):
223            min_val, max_val, log_scale = self.hyperparams[param_name]
224            norm_val = normalized_config[i]
225
226            if log_scale:
227                log_min, log_max = np.log(min_val), np.log(max_val)
228                config[param_name] = np.exp(
229                    log_min + norm_val * (log_max - log_min)
230                )
231            else:
232                denormalized_value = min_val + norm_val * (max_val - min_val)
233                # Special handling for boolean parameters represented as (0, 1, False)
234                if (
235                    min_val == 0
236                    and max_val == 1
237                    and not log_scale
238                    and isinstance(min_val, int)
239                ):
240                    config[param_name] = bool(round(denormalized_value))
241                # Convert to int if needed (if min and max are integers and not log-scale)
242                elif (
243                    isinstance(min_val, int)
244                    and isinstance(max_val, int)
245                    and not log_scale
246                ):
247                    config[param_name] = int(round(denormalized_value))
248                else:
249                    config[param_name] = denormalized_value
250
251        return config
252
253    def generate_sobol_samples(self, n_samples=None):
254        """
255        Generate Sobol quasi-random samples in hyperparameter space
256
257        Parameters:
258        -----------
259        n_samples : int, optional
260            Number of samples (default: 75 * n_dims)
261
262        Returns:
263        --------
264        samples : ndarray, shape (n_samples, n_dims)
265            Normalized samples in [0, 1]^n_dims
266        """
267        if n_samples is None:
268            n_samples = 75 * self.n_dims
269
270        sobol = Sobol(d=self.n_dims, scramble=True, seed=self.random_state)
271        self.sobol_samples = sobol.random(n_samples)
272
273        return self.sobol_samples
274
275    def evaluate_configurations(
276        self,
277        X_train,
278        X_test,
279        y_train,
280        y_test,
281        model_class,
282        model_kwargs=None,
283        evaluation_fn=None,
284        n_folds=5,
285    ):
286        """
287        Evaluate model configurations and compute generalization gaps
288
289        Parameters:
290        -----------
291        X_train, X_test, y_train, y_test : arrays
292            Train/test data
293        model_class : class
294            Model class to instantiate (e.g., LGBMClassifier)
295        model_kwargs : dict, optional
296            Fixed kwargs for model (e.g., {'random_state': 42, 'verbose': -1})
297        evaluation_fn : callable, optional
298            Function to evaluate model: evaluation_fn(y_true, y_pred) -> score
299            Default: balanced_accuracy_score for classification
300        n_folds : int
301            Number of CV folds for training score estimation
302
303        Returns:
304        --------
305        results : DataFrame
306            Results with columns: hyperparams, cv_train, test_score, gap_abs, etc.
307        """
308        from sklearn.model_selection import StratifiedKFold, KFold
309        from sklearn.metrics import balanced_accuracy_score, r2_score
310
311        if self.sobol_samples is None:
312            raise ValueError("Must generate Sobol samples first")
313
314        if model_kwargs is None:
315            model_kwargs = {}
316
317        if evaluation_fn is None:
318            # Heuristic: check if classification or regression
319            if len(np.unique(y_train)) < 20:
320                evaluation_fn = balanced_accuracy_score
321                kfold = StratifiedKFold(
322                    n_splits=n_folds,
323                    shuffle=True,
324                    random_state=self.random_state,
325                )
326            else:
327                evaluation_fn = r2_score
328                kfold = KFold(
329                    n_splits=n_folds,
330                    shuffle=True,
331                    random_state=self.random_state,
332                )
333        else:
334            kfold = KFold(
335                n_splits=n_folds, shuffle=True, random_state=self.random_state
336            )
337
338        results = []
339        n_samples = len(self.sobol_samples)
340
341        print(
342            f"Evaluating {n_samples} configurations ({n_folds}-fold CV + test)..."
343        )
344
345        for idx, norm_config in tqdm(enumerate(self.sobol_samples)):
346            if (idx + 1) % 50 == 0:
347                print(
348                    f"  Progress: {idx + 1}/{n_samples} "
349                    f"({100*(idx+1)/n_samples:.1f}%)"
350                )
351
352            config = self._denormalize_config(norm_config)
353
354            # Cross-validation score
355            cv_scores = []
356            for train_idx, val_idx in kfold.split(X_train, y_train):
357                if hasattr(X_train, "iloc"):  # DataFrame
358                    X_tr, X_val = X_train.iloc[train_idx], X_train.iloc[val_idx]
359                    y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
360                else:  # ndarray
361                    X_tr, X_val = X_train[train_idx], X_train[val_idx]
362                    y_tr, y_val = y_train[train_idx], y_train[val_idx]
363
364                model = model_class(**config, **model_kwargs)
365                model.fit(X_tr, y_tr)
366                y_pred = model.predict(X_val)
367                cv_scores.append(evaluation_fn(y_val, y_pred))
368
369            cv_train = np.mean(cv_scores)
370
371            # Test score
372            model = model_class(**config, **model_kwargs)
373            model.fit(X_train, y_train)
374            y_pred_test = model.predict(X_test)
375            test_score = evaluation_fn(y_test, y_pred_test)
376
377            # Generalization gaps
378            gap_abs = cv_train - test_score
379            gap_norm = (
380                (cv_train - test_score) / (1 - cv_train) if cv_train < 1 else 0
381            )
382            gap_mpe = (
383                ((cv_train - test_score) / cv_train * 100)
384                if cv_train > 0
385                else 0
386            )
387
388            results.append(
389                {
390                    **config,
391                    "cv_train": cv_train,
392                    "test_score": test_score,
393                    "gap_abs": gap_abs,
394                    "gap_norm": gap_norm,
395                    "gap_mpe": gap_mpe,
396                }
397            )
398
399        self.evaluation_results = pd.DataFrame(results)
400
401        print(f"\n✓ Completed {n_samples} evaluations")
402        print(
403            f"  CV Train Score: {self.evaluation_results['cv_train'].mean():.4f} \u00b1 "
404            f"{self.evaluation_results['cv_train'].std():.4f}"
405        )
406        print(
407            f"  Test Score: {self.evaluation_results['test_score'].mean():.4f} \u00b1 "
408            f"{self.evaluation_results['test_score'].std():.4f}"
409        )
410        print(
411            f"  Absolute Gap: {self.evaluation_results['gap_abs'].mean():.4f} \u00b1 "
412            f"{self.evaluation_results['gap_abs'].std():.4f}"
413        )
414
415        return self.evaluation_results
416
417    def fit_surrogate(self, surrogate, target_gap="gap_abs"):
418        """
419        Fit surrogate model to generalization gap data
420
421        Parameters:
422        -----------
423        surrogate : GenericSurrogate
424            Surrogate model instance
425        target_gap : str
426            Target variable to model ('gap_abs', 'gap_norm', 'gap_mpe')
427
428        Returns:
429        --------
430        surrogate : GenericSurrogate
431            Fitted surrogate model
432        metrics : dict
433            Performance metrics (R², MAE, coverage if applicable)
434        """
435        if self.evaluation_results is None:
436            raise ValueError("Must evaluate configurations first")
437
438        X = self.sobol_samples
439        y = self.evaluation_results[target_gap].values
440
441        # Train/validation split
442        split_idx = int(0.8 * len(X))
443        X_train, X_val = X[:split_idx], X[split_idx:]
444        y_train, y_val = y[:split_idx], y[split_idx:]
445
446        print(
447            f"Training surrogate on {len(X_train)} samples, "
448            f"validating on {len(X_val)} samples..."
449        )
450
451        # Fit surrogate
452        surrogate.fit(X_train, y_train)
453
454        # Evaluate
455        y_pred, y_lower, y_upper = surrogate.predict(X_val, return_std=True)
456
457        from sklearn.metrics import r2_score, mean_absolute_error
458
459        r2 = r2_score(y_val, y_pred)
460        mae = mean_absolute_error(y_val, y_pred)
461
462        metrics = {"r2": r2, "mae": mae}
463
464        # Compute coverage if intervals available
465        if not np.array_equal(y_lower, y_upper):
466            coverage = np.mean((y_val >= y_lower) & (y_val <= y_upper))
467            interval_width = np.mean(y_upper - y_lower)
468            metrics["coverage"] = coverage
469            metrics["interval_width"] = interval_width
470
471            print(f"\n✓ Surrogate Performance:")
472            print(f"  R² Score: {r2:.4f}")
473            print(f"  MAE: {mae:.6f}")
474            print(f"  Coverage: {coverage:.2%}")
475            print(f"  Mean Interval Width: {interval_width:.6f}")
476        else:
477            print(f"\n✓ Surrogate Performance:")
478            print(f"  R² Score: {r2:.4f}")
479            print(f"  MAE: {mae:.6f}")
480
481        self.surrogate = surrogate
482        return surrogate, metrics
483
484    def compute_sobol_indices(self, n_samples=10000):
485        """
486        Compute Sobol sensitivity indices for global sensitivity analysis
487
488        Parameters:
489        -----------
490        n_samples : int
491            Number of Monte Carlo samples for Sobol estimation
492
493        Returns:
494        --------
495        indices : DataFrame
496            Sobol indices with columns: Parameter, S1, ST, Interaction
497        """
498        if self.surrogate is None:
499            raise ValueError("Must fit surrogate first")
500
501        problem = {
502            "num_vars": self.n_dims,
503            "names": self.hyperparam_names,
504            "bounds": [[0, 1]] * self.n_dims,
505        }
506
507        param_values = sobol_sample.sample(
508            problem, n_samples, calc_second_order=False
509        )
510        Y = self.surrogate.predict(param_values)
511
512        Si = sobol_analyze.analyze(
513            problem, Y, calc_second_order=False, print_to_console=False
514        )
515
516        self.sobol_indices = pd.DataFrame(
517            {
518                "Parameter": self.hyperparam_names,
519                "S1": Si["S1"],
520                "ST": Si["ST"],
521                "Interaction": Si["ST"] - Si["S1"],
522            }
523        ).sort_values("S1", ascending=False)
524
525        print("\n" + "=" * 70)
526        print("Sobol Sensitivity Indices (Variance Decomposition)")
527        print("=" * 70)
528        print(self.sobol_indices.to_string(index=False))
529        print("\nS1 = First-order effect (main effect)")
530        print("ST = Total-order effect (main + interactions)")
531        print("Interaction = ST - S1 (pure interaction effects)")
532
533        return self.sobol_indices
534
535    def optimize_acquisition(
536        self,
537        acquisition="ei",
538        n_starts=10,
539        kappa=2.0,
540        xi=0.01,
541        minimize_gap=True,
542    ):
543        """
544        Optimize acquisition function to find best hyperparameter configuration
545
546        Parameters:
547        -----------
548        acquisition : str
549            Acquisition function: 'ei' (Expected Improvement),
550            'ucb' (Upper Confidence Bound), 'pi' (Probability of Improvement)
551        n_starts : int
552            Number of random restarts for multi-start optimization
553        kappa : float
554            Exploration parameter for UCB (higher = more exploration)
555        xi : float
556            Exploration parameter for EI/PI (higher = more exploration)
557        minimize_gap : bool
558            If True, minimize gap; if False, maximize test performance
559
560        Returns:
561        --------
562        results : dict
563            Optimization results including best config and expected improvement
564        """
565        if self.surrogate is None:
566            raise ValueError("Must fit surrogate first")
567
568        # Get current best
569        if minimize_gap:
570            y_best = self.evaluation_results["gap_abs"].min()
571            print(f"Current best gap: {y_best:.6f}")
572        else:
573            y_best = self.evaluation_results["test_score"].max()
574            print(f"Current best test score: {y_best:.6f}")
575
576        # Define acquisition functions
577        def expected_improvement(x):
578            x = x.reshape(1, -1)
579            y_pred, y_lower, y_upper = self.surrogate.predict(
580                x, return_std=True
581            )
582            mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4  # Approx std
583
584            if sigma < 1e-10:
585                return 0.0
586
587            if minimize_gap:
588                imp = y_best - mu - xi
589            else:
590                imp = mu - y_best - xi
591
592            Z = imp / sigma
593            ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
594            return -ei
595
596        def upper_confidence_bound(x):
597            x = x.reshape(1, -1)
598            y_pred, y_lower, y_upper = self.surrogate.predict(
599                x, return_std=True
600            )
601            mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4
602
603            if minimize_gap:
604                ucb = mu - kappa * sigma
605            else:
606                ucb = mu + kappa * sigma
607
608            return ucb if minimize_gap else -ucb
609
610        def probability_of_improvement(x):
611            x = x.reshape(1, -1)
612            y_pred, y_lower, y_upper = self.surrogate.predict(
613                x, return_std=True
614            )
615            mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4
616
617            if sigma < 1e-10:
618                return 0.0
619
620            if minimize_gap:
621                Z = (y_best - mu - xi) / sigma
622            else:
623                Z = (mu - y_best - xi) / sigma
624
625            pi = norm.cdf(Z)
626            return -pi
627
628        acq_funcs = {
629            "ei": expected_improvement,
630            "ucb": upper_confidence_bound,
631            "pi": probability_of_improvement,
632        }
633        acq_func = acq_funcs[acquisition]
634
635        # Multi-start optimization
636        print(f"\nRunning {n_starts}-start L-BFGS-B optimization...")
637
638        best_result = None
639        best_acq_value = np.inf
640
641        np.random.seed(self.random_state)
642        starting_points = np.random.rand(n_starts, self.n_dims)
643
644        # Add best observed point as starting point
645        if len(self.sobol_samples) > 0:
646            best_idx = (
647                self.evaluation_results["gap_abs"].idxmin()
648                if minimize_gap
649                else self.evaluation_results["test_score"].idxmax()
650            )
651            starting_points[0] = self.sobol_samples[best_idx]
652
653        for i, x0 in enumerate(starting_points):
654            try:
655                result = scipy_minimize(
656                    acq_func,
657                    x0,
658                    method="L-BFGS-B",
659                    bounds=[(0, 1)] * self.n_dims,
660                    options={"maxiter": 1000, "ftol": 1e-9},
661                )
662
663                if result.success and result.fun < best_acq_value:
664                    best_acq_value = result.fun
665                    best_result = result
666            except Exception as e:
667                print(f"  Warning: Optimization {i+1} failed: {str(e)}")
668
669        if best_result is None:
670            raise RuntimeError("All optimization attempts failed")
671
672        # Extract best configuration
673        best_x_normalized = best_result.x
674        best_config = self._denormalize_config(best_x_normalized)
675
676        # Predict gap at optimum
677        y_pred, y_lower, y_upper = self.surrogate.predict(
678            best_x_normalized.reshape(1, -1), return_std=True
679        )
680        mu_best = y_pred[0]
681
682        print(f"\n✓ Optimization Complete")
683        print(f"  Acquisition: {acquisition.upper()}")
684        print(f"  Best acquisition value: {-best_acq_value:.6f}")
685        print(f"  Predicted gap at optimum: {mu_best:.6f}")
686        print(f"  95% CI: [{y_lower[0]:.6f}, {y_upper[0]:.6f}]")
687        print(f"\n  Recommended Configuration:")
688        for param, value in best_config.items():
689            print(
690                f"    {param}: {value if isinstance(value, int) else f'{value:.4f}'}"
691            )
692
693        improvement = y_best - mu_best if minimize_gap else mu_best - y_best
694        print(f"\n  Expected improvement: {improvement:.6f}")
695
696        return {
697            "best_config": best_config,
698            "best_config_normalized": best_x_normalized,
699            "predicted_gap_mean": mu_best,
700            "predicted_gap_lower": y_lower[0],
701            "predicted_gap_upper": y_upper[0],
702            "current_best": y_best,
703            "expected_improvement": improvement,
704            "acquisition_function": acquisition,
705        }
706
707    def run_diagnostics(
708        self,
709        X_train,
710        X_test,
711        y_train,
712        y_test,
713        model_class,
714        surrogate,
715        n_samples=None,
716        model_kwargs=None,
717        target_gap="gap_abs",
718    ):
719        """
720        Run complete diagnostic pipeline
721
722        Parameters:
723        -----------
724        X_train, X_test, y_train, y_test : arrays
725            Train/test data
726        model_class : class
727            Model class to tune
728        surrogate : GenericSurrogate
729            Surrogate model for gap modeling
730        n_samples : int, optional
731            Number of Sobol samples
732        model_kwargs : dict, optional
733            Fixed model kwargs
734        target_gap : str
735            Gap metric to model
736
737        Returns:
738        --------
739        results : dict
740            Complete diagnostic results
741        """
742        # Phase 1: Exploration
743        self.generate_sobol_samples(n_samples)
744        eval_results = self.evaluate_configurations(
745            X_train, X_test, y_train, y_test, model_class, model_kwargs
746        )
747
748        # Phase 2: Surrogate fitting
749        surrogate, metrics = self.fit_surrogate(surrogate, target_gap)
750
751        # Phase 3: Analysis
752        sobol_indices = self.compute_sobol_indices()
753
754        # Phase 4: Optimization
755        opt_results = self.optimize_acquisition()
756
757        return {
758            "evaluation_results": eval_results,
759            "surrogate_metrics": metrics,
760            "sobol_indices": sobol_indices,
761            "optimization": opt_results,
762        }

Generic framework for model generalization diagnostics

This class is agnostic to:

  • The base model being tuned (LightGBM, XGBoost, neural nets, etc.)
  • The surrogate model used (GP, RF, neural nets, etc.)
  • The hyperparameter space structure

Example usage:

>>> from sklearn.gaussian_process import GaussianProcessRegressor
>>> from sklearn.ensemble import RandomForestRegressor
>>>
>>> # Define hyperparameter space
>>> hyperparams = {
>>>     'learning_rate': (0.01, 0.3, True),  # (min, max, log_scale)
>>>     'max_depth': (3, 10, False)
>>> }
>>>
>>> # Initialize framework
>>> diag = GeneralizationOpt(hyperparams, random_state=42)
>>>
>>> # Use GP surrogate with Bayesian uncertainty
>>> gp = GaussianProcessRegressor()
>>> surrogate = GenericSurrogate(gp, conformal=False, bayesian=True)
>>>
>>> # Or use RF with conformal prediction
>>> rf = RandomForestRegressor()
>>> surrogate = GenericSurrogate(rf, conformal=True, bayesian=False)
>>>
>>> # Run diagnostics
>>> results = diag.run_diagnostics(
>>>     X_train, X_test, y_train, y_test,
>>>     model_class=LGBMClassifier,
>>>     surrogate=surrogate,
>>>     n_samples=100
>>> )