GPopt

 1from .GPOpt import GPOpt
 2from .BOstopping import BOstopping
 3from .genericcv import MLOptimizer
 4from .GeneralizationOpt import GeneralizationOpt
 5from .GeneralizationOpt import GenericSurrogate
 6
 7__all__ = [
 8    "GPOpt",
 9    "BOstopping",
10    "MLOptimizer",
11    "GeneralizationOpt",
12    "GenericSurrogate",
13]
class GPOpt:
  36class GPOpt:
  37    """Class GPOpt.
  38
  39    # Arguments:
  40
  41        lower_bound: a numpy array;
  42            lower bound for researched minimum
  43
  44        upper_bound: a numpy array;
  45            upper bound for researched minimum
  46
  47        objective_func: a function;
  48            the objective function to be minimized
  49
  50        params_names: a list;
  51            names of the parameters of the objective function (optional)
  52
  53        surrogate_obj: an object;
  54            An ML model for estimating the uncertainty around the objective function
  55            Must be nnetsauce.CustomRegressor or nnetsauce.PredictionInterval
  56
  57        x_init:
  58            initial setting of points where `objective_func` is evaluated (optional)
  59
  60        y_init:
  61            initial setting values at points where `objective_func` is evaluated (optional)
  62
  63        n_init: an integer;
  64            number of points in the initial setting, when `x_init` and `y_init` are not provided
  65
  66        n_choices: an integer;
  67            number of points for the calculation of expected improvement
  68
  69        n_iter: an integer;
  70            number of iterations of the minimization algorithm
  71
  72        alpha: a float;
  73            Value added to the diagonal of the kernel matrix during fitting (for Matern 5/2 kernel)
  74
  75        n_restarts_optimizer: an integer;
  76            The number of restarts of the optimizer for finding the kernel’s parameters which maximize the log-marginal likelihood.
  77
  78        seed: an integer;
  79            reproducibility seed
  80
  81        save: a string;
  82            Specifies where to save the optimizer in its current state
  83
  84        n_jobs: an integer;
  85            number of jobs for parallel computing on initial setting or method `lazyoptimize` (can be -1)
  86
  87        acquisition: a string;
  88            acquisition function: "ei" (expected improvement) or "ucb" (upper confidence bound)
  89
  90        method: an str;
  91            "bayesian" (default) for Gaussian posteriors, "mc" for Monte Carlo posteriors,
  92            "splitconformal" (only for acquisition = "ucb") for conformalized surrogates
  93
  94        min_value: a float;
  95            minimum value of the objective function (default is None). For example,
  96            if objective function is accuracy, will be 1, and the algorithm will stop
  97
  98        per_second: a boolean;
  99            __experimental__, default is False (leave to default for now)
 100
 101        log_scale: a boolean;
 102            __experimental__, default is False (leave to default for now)
 103
 104    see also [Bayesian Optimization with GPopt](https://thierrymoudiki.github.io/blog/2021/04/16/python/misc/gpopt)
 105        or [Hyperparameters tuning with GPopt](https://thierrymoudiki.github.io/blog/2021/06/11/python/misc/hyperparam-tuning-gpopt) [Agnostic BayesOpt](https://thierrymoudiki.github.io/blog/2024/12/09/python/bayesconfoptim)
 106
 107    """
 108
 109    def __init__(
 110        self,
 111        lower_bound,
 112        upper_bound,
 113        objective_func=None,
 114        params_names=None,
 115        surrogate_obj=None,
 116        x_init=None,
 117        y_init=None,
 118        n_init=10,
 119        n_choices=100000,
 120        n_iter=190,
 121        alpha=1e-6,
 122        n_restarts_optimizer=25,
 123        seed=123,
 124        save=None,
 125        n_jobs=1,
 126        acquisition="ei",
 127        method="bayesian",
 128        min_value=None,
 129        per_second=False,  # /!\ very experimental
 130        log_scale=False,  # /!\ experimental
 131    ):
 132        n_dims = len(lower_bound)
 133
 134        assert n_dims == len(
 135            upper_bound
 136        ), "'upper_bound' and 'lower_bound' must have the same dimensions"
 137
 138        self.objective_func = objective_func
 139        self.params_names = params_names
 140        self.lower_bound = lower_bound
 141        self.upper_bound = upper_bound
 142        self.y_init = y_init
 143        self.log_scale = log_scale
 144        self.n_dims = n_dims
 145        self.n_init = n_init
 146        self.n_choices = n_choices
 147        self.n_iter = n_iter
 148        self.alpha = alpha
 149        self.n_restarts_optimizer = n_restarts_optimizer
 150        self.seed = seed
 151        self.save = save
 152        self.n_jobs = n_jobs  # for parallel case on initial design
 153        self.per_second = per_second
 154        self.x_min = None
 155        self.y_min = None
 156        self.y_mean = None
 157        self.y_std = None
 158        self.y_lower = None
 159        self.y_upper = None
 160        self.best_surrogate = None
 161        self.acquisition = acquisition
 162        self.min_value = min_value
 163        self.acq = np.array([])
 164        self.max_acq = []
 165        if surrogate_obj is None:
 166            self.surrogate_obj = GaussianProcessRegressor(
 167                kernel=Matern(nu=2.5),
 168                alpha=self.alpha,
 169                normalize_y=True,
 170                n_restarts_optimizer=self.n_restarts_optimizer,
 171                random_state=self.seed,
 172            )
 173        else:
 174            self.surrogate_obj = surrogate_obj
 175        assert method in (
 176            "bayesian",
 177            "mc",
 178            "splitconformal",
 179        ), "method must be in ('bayesian', 'mc', 'splitconformal')"
 180        self.method = method
 181        self.posterior_ = None
 182
 183        # Sobol seqs for initial design and choices
 184        sobol_seq_init = np.transpose(
 185            generate_sobol2(
 186                n_dims=self.n_dims,
 187                n_points=self.n_init,
 188                skip=2,
 189            )
 190        )
 191        sobol_seq_choices = np.transpose(
 192            generate_sobol2(
 193                n_dims=self.n_dims,
 194                n_points=self.n_choices,
 195                skip=self.n_init + 2,
 196            )
 197        )
 198
 199        # Sobol seqs for initial design and choices with bounds
 200        if self.log_scale == False:
 201            bounds_range = upper_bound - lower_bound
 202            self.x_init = (
 203                bounds_range * sobol_seq_init + lower_bound
 204                if x_init is None
 205                else x_init
 206            )
 207            self.x_choices = bounds_range * sobol_seq_choices + lower_bound
 208
 209        else:  # (!) experimental
 210            assert (
 211                lower_bound > 0
 212            ).all(), "all elements of `lower_bound` must be > 0"
 213            assert (
 214                upper_bound > 0
 215            ).all(), "all elements of `upper_bound` must be > 0"
 216
 217            log_lower_bound = np.log(lower_bound)
 218            log_upper_bound = np.log(upper_bound)
 219            log_bounds_range = log_upper_bound - log_lower_bound
 220            self.x_init = (
 221                np.minimum(
 222                    np.exp(log_bounds_range * sobol_seq_init + log_lower_bound),
 223                    1.7976931348623157e308,
 224                )
 225                if x_init is None
 226                else x_init
 227            )
 228            self.x_choices = np.minimum(
 229                np.exp(log_bounds_range * sobol_seq_choices + log_lower_bound),
 230                1.7976931348623157e308,
 231            )
 232
 233        # shelve for saving (not for loading)
 234        if self.save is not None:
 235            self.sh = shelve.open(filename=save, flag="c", writeback=True)
 236
 237        if self.per_second:
 238            self.timings = []
 239            self.rf_obj = RandomForestRegressor(
 240                n_estimators=250, random_state=self.seed
 241            )
 242
 243        self.n_jobs = n_jobs
 244
 245    # from sklearn.base
 246    def get_params(self):
 247        """Get object attributes.
 248
 249        Returns
 250        -------
 251        params : mapping of string to any
 252            Parameter names mapped to their values.
 253        """
 254        out = dict()
 255        param_names = dir(self)
 256        for key in param_names:
 257            if key.startswith("_") is False:
 258                out[key] = getattr(self, key, None)
 259
 260        return out
 261
 262    # for parallel case on initial design
 263    def eval_objective(self, arg):
 264        try:
 265            return self.objective_func(self.x_init[arg, :])
 266        except:
 267            return 1e06
 268
 269    # load data from stored shelve
 270    def load(self, path):
 271        """load data from stored shelve.
 272
 273        # Arguments
 274
 275        path : a string; path to stored shelve.
 276
 277        See also: [Bayesian Optimization with GPopt Part 2 (save and resume)](https://thierrymoudiki.github.io/blog/2021/04/30/python/misc/gpopt)
 278        """
 279
 280        self.sh = shelve.open(filename=path)
 281        for key, value in self.sh.items():
 282            setattr(self, key, value)
 283
 284    # update shelve in optimization loop
 285    def update_shelve(self):
 286        for key, value in self.get_params().items():
 287            if (callable(value) is False) & (key != "sh"):
 288                self.sh[key] = value
 289        self.sh.sync()
 290
 291    # closing shelve (can't be modified after)
 292    def close_shelve(self):
 293        """Close shelve.
 294
 295        # Arguments
 296
 297        No argument.
 298
 299        See also: [Bayesian Optimization with GPopt Part 2 (save and resume)](https://thierrymoudiki.github.io/blog/2021/04/30/python/misc/gpopt)
 300        """
 301
 302        self.sh.close()
 303
 304    # fit predict
 305    def surrogate_fit_predict(
 306        self,
 307        X_train,
 308        y_train,
 309        X_test,
 310        return_std=False,
 311        return_pi=False,
 312        param_search_init_design=False,
 313        param_distributions=None,
 314        **kwargs,
 315    ):
 316        if len(X_train.shape) == 1:
 317            X_train = X_train.reshape((-1, 1))
 318            X_test = X_test.reshape((-1, 1))
 319
 320        if (
 321            X_train.shape[0] <= self.n_init and param_search_init_design == True
 322        ):  # on initial design
 323            try:
 324                rs_obj = RandomizedSearchCV(
 325                    self.surrogate_obj,
 326                    param_distributions=param_distributions,
 327                    random_state=42,
 328                    cv=3,
 329                    **kwargs,
 330                )
 331                rs_obj.fit(X_train, y_train)
 332                self.surrogate_obj = rs_obj.best_estimator_
 333            except Exception as e:
 334                print(str(e))
 335
 336        # Get mean and standard deviation (+ lower and upper for not GPs)
 337        assert (
 338            return_std == True and return_pi == True
 339        ) == False, "must have either return_std == True or return_pi == True"
 340
 341        if return_std == True:
 342            self.posterior_ = "gaussian"
 343            self.surrogate_obj.fit(X_train, y_train)
 344            return self.surrogate_obj.predict(X_test, return_std=True)
 345
 346        elif (
 347            return_pi == True
 348        ):  # here, self.surrogate_obj must have `replications` not None
 349            if self.surrogate_obj.replications is not None:
 350                self.posterior_ = "mc"
 351                self.surrogate_obj.fit(X_train, y_train)
 352                try:  # it's a nnetsauce.CustomRegressor
 353                    res = self.surrogate_obj.predict(
 354                        X_test, return_pi=True, method="splitconformal"
 355                    )
 356                except Exception:  # it's a nnetsauce.PredictionInterval
 357                    try:
 358                        res = self.surrogate_obj.predict(X_test, return_pi=True)
 359                    except Exception as e:
 360                        print(
 361                            str(e)
 362                            + "Sureproof way is to encapsulate your surrogate in nnetsauce.PredictionInterval model"
 363                        )
 364
 365                self.y_sims = res.sims
 366                self.y_mean, self.y_std = (
 367                    np.mean(self.y_sims, axis=1),
 368                    np.std(self.y_sims, axis=1),
 369                )
 370                return self.y_mean, self.y_std, self.y_sims
 371
 372            else:  # self.surrogate_obj is conformalized (uses nnetsauce.PredictionInterval)
 373                assert (
 374                    self.acquisition == "ucb"
 375                ), "'acquisition' must be 'ucb' for conformalized surrogates"
 376                self.posterior_ = None
 377                self.surrogate_obj.fit(X_train, y_train)
 378                try:
 379                    res = self.surrogate_obj.predict(
 380                        X_test, return_pi=True, method="splitconformal"
 381                    )
 382                except Exception:
 383                    res = self.surrogate_obj.predict(X_test, return_pi=True)
 384                self.y_mean = res.mean
 385                self.y_lower = res.lower
 386                self.y_upper = res.upper
 387                return self.y_mean, self.y_lower, self.y_upper
 388
 389        else:
 390            raise NotImplementedError
 391
 392    # fit predict timings
 393    def timings_fit_predict(self, X_train, y_train, X_test):
 394        if len(X_train.shape) == 1:
 395            X_train = X_train.reshape((-1, 1))
 396            X_test = X_test.reshape((-1, 1))
 397
 398        # Get mean preds for timings
 399        return self.rf_obj.fit(X_train, y_train).predict(X_test)
 400
 401    # find next parameter by using expected improvement (ei)
 402    def next_parameter_by_acq(self, i, acq="ei"):
 403        if acq == "ei":
 404            if self.posterior_ == "gaussian":
 405                gamma_hat = (self.y_min - self.y_mean) / self.y_std
 406                self.acq = -self.y_std * (
 407                    gamma_hat * st.norm.cdf(gamma_hat) + st.norm.pdf(gamma_hat)
 408                )
 409            elif self.posterior_ == "mc":
 410                self.acq = -np.mean(
 411                    np.maximum(self.y_min - self.y_sims, 0), axis=1
 412                )
 413
 414        if acq == "ucb":
 415            if self.posterior_ == "gaussian":
 416                self.acq = self.y_mean - 1.96 * self.y_std
 417                self.ucb = self.y_mean + 1.96 * self.y_std
 418
 419            elif self.posterior_ is None:  # split conformal(ized) estimator
 420                self.acq = self.y_lower
 421                self.ucb = self.y_upper
 422
 423        # find max index -----
 424
 425        if self.per_second is False:
 426            # find index for max. ei
 427            max_index = self.acq.argmin()
 428
 429        else:  # self.per_second is True
 430            # predict timings on self.x_choices
 431            # train on X = self.parameters and y = self.timings
 432            # (must have same shape[0])
 433            timing_preds = self.timings_fit_predict(
 434                X_train=np.asarray(self.parameters),
 435                y_train=np.asarray(self.timings),
 436                X_test=self.x_choices,
 437            )
 438
 439            # find index for max. ei (and min. timings)
 440            max_index = (-self.acq / timing_preds).argmax()
 441
 442        self.max_acq.append(np.abs(self.acq[max_index]))
 443
 444        # Select next choice
 445        next_param = self.x_choices[max_index, :]
 446
 447        if next_param in np.asarray(self.parameters):
 448            if self.log_scale == False:
 449                np.random.seed(self.seed * i + 1000)
 450                next_param = (
 451                    self.upper_bound - self.lower_bound
 452                ) * np.random.rand(self.n_dims) + self.lower_bound
 453
 454            else:  # /!\ very... experimental
 455                np.random.seed(self.seed)
 456                log_upper_bound = np.log(self.upper_bound)
 457                log_lower_bound = np.log(self.lower_bound)
 458                log_bounds_range = log_upper_bound - log_lower_bound
 459
 460                next_param = np.minimum(
 461                    np.exp(
 462                        log_bounds_range * np.random.rand(self.n_dims)
 463                        + log_lower_bound
 464                    ),
 465                    1.7976931348623157e308,
 466                )
 467
 468        return next_param
 469
 470    # optimize the objective
 471    def optimize(
 472        self,
 473        verbose=1,
 474        n_more_iter=None,
 475        abs_tol=None,  # suggested 1e-4, for n_iter = 200
 476        ucb_tol=None,
 477        min_budget=50,  # minimum budget for early stopping
 478        func_args=None,
 479        param_search_init_design=False,
 480        param_distributions=None,
 481    ):
 482        """Launch optimization loop.
 483
 484        # Arguments:
 485
 486            verbose: an integer;
 487                verbose = 0: nothing is printed,
 488                verbose = 1: a progress bar is printed (longer than 0),
 489                verbose = 2: information about each iteration is printed (longer than 1)
 490
 491            n_more_iter: an integer;
 492                additional number of iterations for the optimizer (which has been run once)
 493
 494            abs_tol: a float;
 495                tolerance for convergence of the optimizer (early stopping based on acquisition function)
 496
 497            ucb_tol: a float;
 498                tolerance for convergence of the optimizer (early stopping based on length of prediction intervals)
 499                for UCB criterion
 500
 501            min_budget: an integer (default is 50);
 502                minimum number of iterations before early stopping controlled by `abs_tol`
 503
 504            func_args: a list;
 505                additional parameters for the objective function (if necessary)
 506
 507            param_search_init_design: a boolean;
 508                whether random search tuning must occur on the initial design or not
 509
 510            param_distributions: dict or list of dicts;
 511                Dictionary with parameters names (str) as keys and distributions or lists of
 512                parameters to try. Distributions must provide a rvs method for sampling
 513                (such as those from scipy.stats.distributions). If a list is given, it
 514                is sampled uniformly. If a list of dicts is given, first a dict is sampled
 515                uniformly, and then a parameter is sampled using that dict as above.
 516
 517        see also [Bayesian Optimization with GPopt](https://thierrymoudiki.github.io/blog/2021/04/16/python/misc/gpopt)
 518        and [Hyperparameters tuning with GPopt](https://thierrymoudiki.github.io/blog/2021/06/11/python/misc/hyperparam-tuning-gpopt)
 519
 520        """
 521
 522        # verbose = 0: nothing is printed
 523        # verbose = 1: a progress bar is printed (longer than 0)
 524        # verbose = 2: information about each iteration is printed (longer than 1)
 525        if func_args is None:
 526            func_args = []
 527
 528        if (
 529            n_more_iter is None
 530        ):  # initial optimization, before more iters are requested
 531            n_iter = self.n_iter
 532            # stopping iter for early stopping (default is total budget)
 533            iter_stop = n_iter  # potentially # got to check this
 534
 535            # initial design  ----------
 536
 537            if (verbose == 1) | (verbose == 2):
 538                print(f"\n Creating initial design... \n")
 539
 540            if verbose == 1:
 541                progbar = Progbar(target=self.n_init)
 542
 543            self.parameters = self.x_init.tolist()
 544            self.scores = []
 545
 546            if self.save is not None:
 547                self.update_shelve()
 548
 549            if self.y_init is None:  # calculate scores on initial design
 550                assert (
 551                    self.objective_func is not None
 552                ), "self.y_init is None: must have 'objective_func' not None"
 553
 554                if self.n_jobs == 1:
 555                    for i in range(self.n_init):
 556                        x_next = self.x_init[i, :]
 557
 558                        try:
 559                            if self.per_second is True:
 560                                start = time()
 561                                score = self.objective_func(x_next, *func_args)
 562                                if (np.isfinite(score) == False) or (
 563                                    np.isnan(score) == True
 564                                ):
 565                                    continue
 566                                self.timings.append(np.log(time() - start))
 567
 568                            else:  # self.per_second is False
 569                                score = self.objective_func(x_next, *func_args)
 570                                if (np.isfinite(score) == False) or (
 571                                    np.isnan(score) == True
 572                                ):
 573                                    continue
 574
 575                            self.scores.append(score)
 576
 577                            if self.save is not None:
 578                                self.update_shelve()
 579
 580                        except:
 581                            continue
 582
 583                        if verbose == 1:
 584                            progbar.update(i)  # update progress bar
 585
 586                        if verbose == 2:
 587                            print(f"point: {x_next}; score: {score}")
 588                    # end loop # calculate scores on initial design
 589
 590                    if verbose == 1:
 591                        progbar.update(self.n_init)
 592
 593                else:  # self.n_jobs != 1
 594                    assert (
 595                        self.per_second is False
 596                    ), "timings not calculated here"
 597
 598                    scores = Parallel(n_jobs=self.n_jobs, prefer="threads")(
 599                        delayed(self.objective_func)(self.x_init[i, :])
 600                        for i in range(self.n_init)
 601                    )
 602
 603                    self.scores = scores
 604
 605                    if self.save is not None:
 606                        self.update_shelve()
 607
 608            else:  # if self.y_init is not None:
 609                assert self.x_init.shape[0] == len(
 610                    self.y_init
 611                ), "must have: self.x_init.shape[0] == len(self.y_init)"
 612
 613                self.scores = pickle.loads(
 614                    pickle.dumps(self.y_init.tolist(), -1)
 615                )
 616
 617            # current best score on initial design
 618            min_index = (np.asarray(self.scores)).argmin()
 619            self.y_min = self.scores[min_index]
 620            self.x_min = self.x_init[min_index, :]
 621
 622            # current gp mean and std on initial design
 623            # /!\ if GP
 624            if param_search_init_design == False:
 625                if self.method == "bayesian":
 626                    self.posterior_ = "gaussian"
 627                    try:
 628                        y_mean, y_std = self.surrogate_fit_predict(
 629                            np.asarray(self.parameters),
 630                            np.asarray(self.scores),
 631                            self.x_choices,
 632                            return_std=True,
 633                            return_pi=False,
 634                        )
 635                    except ValueError:  # do not remove this
 636                        preds_with_std = self.surrogate_fit_predict(
 637                            np.asarray(self.parameters),
 638                            np.asarray(self.scores),
 639                            self.x_choices,
 640                            return_std=True,
 641                            return_pi=False,
 642                        )
 643                        y_mean, y_std = preds_with_std[0], preds_with_std[1]
 644                    self.y_mean = y_mean
 645                    self.y_std = np.maximum(2.220446049250313e-16, y_std)
 646
 647                elif self.method == "mc":
 648                    self.posterior_ = "mc"
 649                    assert self.surrogate_obj.__class__.__name__.startswith(
 650                        "CustomRegressor"
 651                    ) or self.surrogate_obj.__class__.__name__.startswith(
 652                        "PredictionInterval"
 653                    ), "for `method = 'mc'`, the surrogate must be a nnetsauce.CustomRegressor() or nnetsauce.PredictionInterval()"
 654                    assert (
 655                        self.surrogate_obj.replications is not None
 656                    ), "for `method = 'mc'`, the surrogate must be a nnetsauce.CustomRegressor() with a number of 'replications' provided"
 657                    preds_with_std = self.surrogate_fit_predict(
 658                        np.asarray(self.parameters),
 659                        np.asarray(self.scores),
 660                        self.x_choices,
 661                        return_std=False,
 662                        return_pi=True,
 663                    )
 664                    y_mean, y_std = preds_with_std[0], preds_with_std[1]
 665                    self.y_mean = y_mean
 666                    self.y_std = np.maximum(2.220446049250313e-16, y_std)
 667
 668                elif self.method == "splitconformal":
 669                    self.posterior_ = None
 670                    # assert self.surrogate_obj.__class__.__name__.startswith(
 671                    #    "PredictionInterval"
 672                    # ), "for `method = 'splitconformal'`, the surrogate must be a nnetsauce.PredictionInterval()"
 673                    preds_with_pi = self.surrogate_fit_predict(
 674                        np.asarray(self.parameters),
 675                        np.asarray(self.scores),
 676                        self.x_choices,
 677                        return_std=False,
 678                        return_pi=True,
 679                    )
 680                    y_lower = preds_with_pi[1]
 681                    self.lower = y_lower
 682
 683            else:
 684                assert (
 685                    param_distributions is not None
 686                ), "When 'param_search_init_design == False', 'param_distributions' must be provided"
 687
 688                if self.method == "bayesian":
 689                    self.posterior_ = "gaussian"
 690                    try:
 691                        y_mean, y_std = self.surrogate_fit_predict(
 692                            np.asarray(self.parameters),
 693                            np.asarray(self.scores),
 694                            self.x_choices,
 695                            return_std=True,
 696                            return_pi=False,
 697                            param_search_init_design=True,
 698                            param_distributions=param_distributions,
 699                        )
 700                    except ValueError:  # do not remove this
 701                        preds_with_std = self.surrogate_fit_predict(
 702                            np.asarray(self.parameters),
 703                            np.asarray(self.scores),
 704                            self.x_choices,
 705                            return_std=True,
 706                            return_pi=False,
 707                            param_search_init_design=True,
 708                            param_distributions=param_distributions,
 709                        )
 710                        y_mean, y_std = preds_with_std[0], preds_with_std[1]
 711                    self.y_mean = y_mean
 712                    self.y_std = np.maximum(2.220446049250313e-16, y_std)
 713
 714                elif self.method == "mc":
 715                    self.posterior_ = "mc"
 716                    assert self.surrogate_obj.__class__.__name__.startswith(
 717                        "CustomRegressor"
 718                    ) or self.surrogate_obj.__class__.__name__.startswith(
 719                        "PredictionInterval"
 720                    ), "for `method = 'mc'`, the surrogate must be a nnetsauce.CustomRegressor() or nnetsauce.PredictionInterval()"
 721                    assert (
 722                        self.surrogate_obj.replications is not None
 723                    ), "for `method = 'mc'`, the surrogate must be a nnetsauce.CustomRegressor() with a number of 'replications' provided"
 724                    preds_with_std = self.surrogate_fit_predict(
 725                        np.asarray(self.parameters),
 726                        np.asarray(self.scores),
 727                        self.x_choices,
 728                        return_std=False,
 729                        return_pi=True,
 730                        param_search_init_design=True,
 731                        param_distributions=param_distributions,
 732                    )
 733                    y_mean, y_std = preds_with_std[0], preds_with_std[1]
 734                    self.y_mean = y_mean
 735                    self.y_std = np.maximum(2.220446049250313e-16, y_std)
 736
 737                elif self.method == "splitconformal":
 738                    self.posterior_ = None
 739                    # assert self.surrogate_obj.__class__.__name__.startswith(
 740                    #    "PredictionInterval"
 741                    # ), "for `method = 'splitconformal'`, the surrogate must be a nnetsauce.PredictionInterval()"
 742                    preds_with_pi = self.surrogate_fit_predict(
 743                        np.asarray(self.parameters),
 744                        np.asarray(self.scores),
 745                        self.x_choices,
 746                        return_std=False,
 747                        return_pi=True,
 748                        param_search_init_design=True,
 749                        param_distributions=param_distributions,
 750                    )
 751                    y_lower = preds_with_pi[1]
 752                    self.lower = y_lower
 753
 754            # saving after initial design computation
 755            if self.save is not None:
 756                self.update_shelve()
 757
 758        else:  # if n_more_iter is not None
 759            assert self.n_iter > 5, "you must have n_iter > 5"
 760            n_iter = n_more_iter
 761            iter_stop = len(self.max_acq) + n_more_iter  # potentially
 762
 763        if (verbose == 1) | (verbose == 2):
 764            print(f"\n ...Done. \n")
 765            try:
 766                print(np.hstack((self.x_init, self.y_init.reshape(-1, 1))))
 767            except:
 768                pass
 769
 770        # end init design ----------
 771
 772        # if n_more_iter is None: # initial optimization, before more iters are requested
 773
 774        if (verbose == 1) | (verbose == 2):
 775            print(f"\n Optimization loop... \n")
 776
 777        # early stopping?
 778        if abs_tol is not None:
 779            assert (
 780                min_budget > 20
 781            ), "With 'abs_tol' provided, you must have 'min_budget' > 20"
 782            self.abs_tol = abs_tol
 783
 784        if ucb_tol is not None:
 785            assert (
 786                min_budget > 20
 787            ), "With 'ucb_tol' provided, you must have 'min_budget' > 20"
 788            assert (
 789                self.acquisition == "ucb"
 790            ), "With 'ucb_tol' provided, you must have 'acquisition' == 'ucb'"
 791            self.ucb_tol = ucb_tol
 792
 793        if verbose == 1:
 794            progbar = Progbar(target=n_iter)
 795
 796        # main loop ----------
 797
 798        for i in range(n_iter):
 799            # find next set of parameters (vector), maximizing acquisition function
 800            next_param = self.next_parameter_by_acq(i=i, acq=self.acquisition)
 801
 802            try:
 803                if self.per_second is True:
 804                    start = time()
 805
 806                    if self.objective_func is not None:
 807                        score_next_param = self.objective_func(
 808                            next_param, *func_args
 809                        )
 810
 811                        if (np.isfinite(score_next_param) == False) or (
 812                            np.isnan(score_next_param) == True
 813                        ):
 814                            continue
 815
 816                    else:
 817                        assert (self.x_init is not None) and (
 818                            self.y_init is not None
 819                        ), "self.objective_func is not None: must have (self.x_init is not None) and (self.y_init is not None)"
 820
 821                        print(f"\n next param: {next_param} \n")
 822                        score_next_param = float(
 823                            input("get new score: \n")
 824                        )  # or an API response
 825
 826                        if (np.isfinite(score_next_param) == False) or (
 827                            np.isnan(score_next_param) == True
 828                        ):
 829                            continue
 830
 831                    self.timings.append(np.log(time() - start))
 832
 833                else:  # self.per_second is False:
 834                    if self.objective_func is not None:
 835                        score_next_param = self.objective_func(
 836                            next_param, *func_args
 837                        )
 838
 839                        if (np.isfinite(score_next_param) == False) or (
 840                            np.isnan(score_next_param) == True
 841                        ):
 842                            continue
 843
 844                    else:
 845                        assert (self.x_init is not None) and (
 846                            self.y_init is not None
 847                        ), "self.objective_func is not None: must have (self.x_init is not None) and (self.y_init is not None)"
 848
 849                        print(f"\n next param: {next_param} \n")
 850                        score_next_param = float(
 851                            input("get new score: \n")
 852                        )  # or an API response
 853
 854                        if (np.isfinite(score_next_param) == False) or (
 855                            np.isnan(score_next_param) == True
 856                        ):
 857                            continue
 858
 859            except:
 860                continue
 861
 862            self.parameters.append(next_param.tolist())
 863
 864            self.scores.append(score_next_param)
 865
 866            if self.save is not None:
 867                self.update_shelve()
 868
 869            if verbose == 2:
 870                print(f"iteration {i + 1} -----")
 871                print(f"current minimum:  {self.x_min}")
 872                print(f"current minimum score:  {self.y_min}")
 873                print(f"next parameter: {next_param}")
 874                print(f"score for next parameter: {score_next_param} \n")
 875
 876            if score_next_param < self.y_min:
 877                self.x_min = next_param
 878                self.y_min = score_next_param
 879                if self.save is not None:
 880                    self.update_shelve()
 881                if self.y_min == self.min_value:
 882                    break
 883
 884            if self.posterior_ == "gaussian" and self.method == "bayesian":
 885                try:
 886                    self.y_mean, self.y_std = self.surrogate_fit_predict(
 887                        np.asarray(self.parameters),
 888                        np.asarray(self.scores),
 889                        self.x_choices,
 890                        return_std=True,
 891                        return_pi=False,
 892                    )
 893                except:
 894                    (
 895                        self.y_mean,
 896                        self.y_std,
 897                        lower,
 898                        upper,
 899                    ) = self.surrogate_fit_predict(
 900                        np.asarray(self.parameters),
 901                        np.asarray(self.scores),
 902                        self.x_choices,
 903                        return_std=True,
 904                        return_pi=False,
 905                    )
 906
 907            elif self.posterior_ in (None, "mc") and self.method in (
 908                "mc",
 909                "splitconformal",
 910            ):
 911                (
 912                    self.y_mean,
 913                    self.y_lower,
 914                    self.y_upper,
 915                ) = self.surrogate_fit_predict(
 916                    np.asarray(self.parameters),
 917                    np.asarray(self.scores),
 918                    self.x_choices,
 919                    return_std=False,
 920                    return_pi=True,
 921                )
 922
 923            else:
 924                return NotImplementedError
 925
 926            if self.save is not None:
 927                self.update_shelve()
 928
 929            if verbose == 1:
 930                progbar.update(i + 1)  # update progress bar
 931
 932            # early stopping
 933
 934            if abs_tol is not None:
 935                # if self.max_acq.size > (self.n_init + self.n_iter * min_budget_pct):
 936                if len(self.max_acq) > min_budget:
 937                    diff_max_acq = np.abs(np.diff(np.asarray(self.max_acq)))
 938
 939                    if diff_max_acq[-1] <= abs_tol:
 940                        iter_stop = len(self.max_acq)  # index i starts at 0
 941
 942                        break
 943
 944            if ucb_tol is not None:
 945                if len(self.max_acq) > min_budget:
 946                    # print(f"self.ucb: {self.ucb}")
 947                    # print(f"self.acq: {self.acq}")
 948                    # print(f"mean(self.ucb/self.acq): {np.mean(self.ucb/self.acq)/100}")
 949
 950                    if (
 951                        np.abs(np.mean(self.ucb / self.acq) / 100) <= ucb_tol
 952                    ):  # self.ucb is the upper confidence bound for UCB criterion
 953                        iter_stop = len(self.max_acq)
 954
 955                        break
 956
 957        # end main loop ----------
 958
 959        if (verbose == 1) & (i < (n_iter - 1)):
 960            progbar.update(n_iter)
 961
 962        self.n_iter = iter_stop
 963        if self.save is not None:
 964            self.update_shelve()
 965
 966        DescribeResult = namedtuple(
 967            "DescribeResult", ("best_params", "best_score")
 968        )
 969
 970        if self.params_names is None:
 971            return DescribeResult(self.x_min, self.y_min)
 972
 973        else:
 974            return DescribeResult(
 975                dict(zip(self.params_names, self.x_min)), self.y_min
 976            )
 977
 978    # optimize the objective
 979    def lazyoptimize(
 980        self,
 981        verbose=1,
 982        abs_tol=None,  # suggested 1e-4, for n_iter = 200
 983        ucb_tol=None,
 984        min_budget=50,  # minimum budget for early stopping
 985        func_args=None,
 986        estimators="all",
 987        type_pi="kde",  # for now, 'kde', 'bootstrap', 'splitconformal'
 988        type_exec="independent",  # "queue" or "independent" (default)
 989    ):
 990        """Launch optimization loop.
 991
 992        # Arguments:
 993
 994            verbose: an integer;
 995                verbose = 0: nothing is printed,
 996                verbose = 1: a progress bar is printed (longer than 0),
 997                verbose = 2: information about each iteration is printed (longer than 1)
 998
 999            n_more_iter: an integer;
1000                additional number of iterations for the optimizer (which has been run once)
1001
1002            abs_tol: a float;
1003                tolerance for convergence of the optimizer (early stopping based on expected improvement)
1004
1005            ucb_tol: a float;
1006                tolerance for convergence of the optimizer (early stopping based on length of prediction intervals)
1007
1008            min_budget: an integer (default is 50);
1009                minimum number of iterations before early stopping controlled by `abs_tol`
1010
1011            func_args: a list;
1012                additional parameters for the objective function (if necessary)
1013
1014            estimators: an str or a list of strs (estimators names)
1015                if "all", then 30 models are fitted. Otherwise, only those provided in the list
1016                are adjusted; for example ["RandomForestRegressor", "Ridge"]
1017
1018            type_pi: an str;
1019                "kde" (default) or, "splitconformal"; type of prediction intervals for the surrogate
1020                model
1021
1022            type_exec: an str;
1023                "independent" (default) is when surrogate models are adjusted independently on
1024                the same design set and the best model is chosen eventually; "queue" is when
1025                surrogate models are adjusted one after the other, on a design set with
1026                increasing size;
1027
1028        """
1029
1030        # Base case: Gaussian Process
1031        gp_opt_obj = GPOpt(
1032            objective_func=self.objective_func,
1033            lower_bound=self.lower_bound,
1034            upper_bound=self.upper_bound,
1035            n_init=self.n_init,
1036            n_iter=self.n_iter,
1037            alpha=self.alpha,
1038            n_restarts_optimizer=self.n_restarts_optimizer,
1039            seed=self.seed,
1040            n_jobs=self.n_jobs,
1041            acquisition="ei",
1042            surrogate_obj=GaussianProcessRegressor(
1043                kernel=Matern(nu=2.5),
1044                alpha=self.alpha,
1045                normalize_y=True,
1046                n_restarts_optimizer=self.n_restarts_optimizer,
1047                random_state=self.seed,
1048            ),
1049        )
1050        if verbose == 2:
1051            print(
1052                f"\n adjusting surrogate model # {0} (GaussianProcessRegressor(Matern(5/2)))... \n"
1053            )
1054        res_base = gp_opt_obj.optimize(
1055            verbose=verbose,
1056            abs_tol=abs_tol,  # suggested 1e-4, for n_iter = 200
1057            min_budget=min_budget,  # minimum budget for early stopping
1058            func_args=func_args,
1059        )
1060
1061        if estimators == "all":
1062            if type_pi == "kde":
1063                self.regressors = REGRESSORS
1064
1065            else:
1066                self.regressors = [
1067                    (
1068                        est[0],
1069                        ns.PredictionInterval(
1070                            est[1](), type_pi="splitconformal"
1071                        ),
1072                    )
1073                    for est in all_estimators()
1074                    if (
1075                        issubclass(est[1], RegressorMixin)
1076                        and (est[0] not in REMOVED_REGRESSORS)
1077                    )
1078                ]
1079
1080        else:
1081            if type_pi == "kde":
1082                self.regressors = [
1083                    (
1084                        "CustomRegressor(" + est[0] + ")",
1085                        ns.CustomRegressor(
1086                            est[1](), replications=150, type_pi=type_pi
1087                        ),
1088                    )
1089                    for est in all_estimators()
1090                    if (
1091                        issubclass(est[1], RegressorMixin)
1092                        and (est[0] not in REMOVED_REGRESSORS)
1093                        and (est[0] in estimators)
1094                    )
1095                ]
1096
1097            elif type_pi == "splitconformal":
1098                self.regressors = [
1099                    (
1100                        est[0],
1101                        ns.PredictionInterval(
1102                            est[1](), type_pi="splitconformal"
1103                        ),
1104                    )
1105                    for est in all_estimators()
1106                    if (
1107                        issubclass(est[1], RegressorMixin)
1108                        and (est[0] not in REMOVED_REGRESSORS)
1109                        and (est[0] in estimators)
1110                    )
1111                ]
1112
1113        df_res = pd.DataFrame(
1114            np.empty((len(self.regressors) + 1, 2)), columns=["Model", "Score"]
1115        )
1116        df_res.iloc[0, 0] = "GaussianProcessRegressor"
1117        df_res.iloc[0, 1] = res_base.best_score
1118
1119        self.surrogate_fit_predict = partial(
1120            self.surrogate_fit_predict, return_pi=True
1121        )
1122
1123        if (
1124            type_exec == "queue"
1125        ):  # when models are adjusted one after the other on a design set with increasing size
1126            self.x_min = None
1127
1128            self.y_min = np.inf
1129
1130            score_next_param = np.inf
1131
1132            DescribeResult = namedtuple(
1133                "DescribeResult", ("best_params", "best_score", "scores")
1134            )
1135
1136            if verbose == 2:
1137                print(
1138                    f"\n adjusting surrogate model # {1} ({self.regressors[0][0]})... \n"
1139                )
1140
1141            df_res.iloc[0, 0] = self.regressors[0][0]
1142
1143            gp_opt_obj_prev = GPOpt(
1144                objective_func=self.objective_func,
1145                lower_bound=self.lower_bound,
1146                upper_bound=self.upper_bound,
1147                n_init=self.n_init,
1148                n_iter=self.n_iter,
1149                alpha=self.alpha,
1150                n_restarts_optimizer=self.n_restarts_optimizer,
1151                seed=self.seed,
1152                n_jobs=self.n_jobs,
1153                acquisition=self.acquisition,
1154                method=self.method,
1155                min_value=self.min_value,
1156                surrogate_obj=copy.deepcopy(self.regressors[0][1]),
1157            )
1158
1159            gp_opt_obj_prev.optimize(
1160                verbose=verbose,
1161                abs_tol=abs_tol,  # suggested 1e-4, for n_iter = 200
1162                ucb_tol=ucb_tol,
1163                min_budget=min_budget,  # minimum budget for early stopping
1164                func_args=func_args,
1165            )
1166
1167            score_next_param = gp_opt_obj_prev.y_min
1168
1169            df_res.iloc[0, 1] = score_next_param
1170
1171            if self.n_jobs is None:  # sequential optimization
1172                for i in range(len(self.regressors)):
1173                    try:
1174                        if verbose == 2:
1175                            print(
1176                                f"\n adjusting surrogate model # {i + 2} ({self.regressors[i][0]})... \n"
1177                            )
1178
1179                        df_res["Model"][i + 1] = self.regressors[i][0]
1180
1181                        gp_opt_obj = GPOpt(
1182                            objective_func=self.objective_func,
1183                            lower_bound=self.lower_bound,
1184                            upper_bound=self.upper_bound,
1185                            n_init=self.n_init,
1186                            n_iter=self.n_iter,
1187                            alpha=self.alpha,
1188                            n_restarts_optimizer=self.n_restarts_optimizer,
1189                            seed=self.seed,
1190                            n_jobs=self.n_jobs,
1191                            acquisition=self.acquisition,
1192                            method=self.method,
1193                            min_value=self.min_value,
1194                            surrogate_obj=copy.deepcopy(self.regressors[i][1]),
1195                            x_init=np.asarray(gp_opt_obj_prev.parameters),
1196                            y_init=np.asarray(gp_opt_obj_prev.scores),
1197                        )
1198
1199                        gp_opt_obj.optimize(
1200                            verbose=verbose,
1201                            abs_tol=abs_tol,  # suggested 1e-4, for n_iter = 200
1202                            ucb_tol=ucb_tol,
1203                            min_budget=min_budget,  # minimum budget for early stopping
1204                            func_args=func_args,
1205                        )
1206
1207                        score_next_param = gp_opt_obj.y_min
1208
1209                        df_res.iloc[i, 1] = score_next_param
1210
1211                        if score_next_param < self.y_min:
1212                            self.x_min = gp_opt_obj.x_min
1213                            self.y_min = score_next_param
1214                            if self.y_min == self.min_value:
1215                                break
1216
1217                        if verbose == 2:
1218                            print(f"Global iteration #{i + 1} -----")
1219                            print(f"current minimum:  {self.x_min}")
1220                            print(f"current minimum score:  {self.y_min}")
1221                            print(
1222                                f"score for next parameter: {score_next_param} \n"
1223                            )
1224
1225                        gp_opt_obj_prev = copy.deepcopy(gp_opt_obj)
1226
1227                    except ValueError:
1228                        continue
1229
1230            elif self.n_jobs >= 2 or self.n_jobs == -1:  # parallel optimization
1231                pass
1232            else:
1233                raise ValueError(
1234                    "n_jobs must be either None or >= 2 or equal to -1"
1235                )
1236            return DescribeResult(
1237                self.x_min, self.y_min, df_res.sort_values(by="Score")
1238            )
1239
1240        elif (
1241            type_exec == "independent"
1242        ):  # when models are adjusted independently on the same design set and the best model is chosen eventually
1243            self.x_min = None
1244
1245            self.y_min = np.inf
1246
1247            score_next_param = np.inf
1248
1249            DescribeResult = namedtuple(
1250                "DescribeResult",
1251                ("best_params", "best_score", "best_surrogate", "scores"),
1252            )
1253
1254            if verbose == 2:
1255                print(
1256                    f"\n adjusting surrogate model # {1} ({self.regressors[0][0]})... \n"
1257                )
1258
1259            if self.n_jobs is None:  # sequential optimization
1260                for i in range(len(self.regressors)):
1261                    # try:
1262
1263                    if verbose == 2:
1264                        print(
1265                            f"\n adjusting surrogate model # {i + 1} ({self.regressors[i][0]})... \n"
1266                        )
1267
1268                    df_res["Model"][i + 1] = self.regressors[i][0]
1269
1270                    gp_opt_obj = GPOpt(
1271                        objective_func=self.objective_func,
1272                        lower_bound=self.lower_bound,
1273                        upper_bound=self.upper_bound,
1274                        n_init=self.n_init,
1275                        n_iter=self.n_iter,
1276                        alpha=self.alpha,
1277                        n_restarts_optimizer=self.n_restarts_optimizer,
1278                        seed=self.seed,
1279                        n_jobs=self.n_jobs,
1280                        acquisition=self.acquisition,
1281                        method=self.method,
1282                        min_value=self.min_value,
1283                        surrogate_obj=copy.deepcopy(self.regressors[i][1]),
1284                    )
1285
1286                    gp_opt_obj.optimize(
1287                        verbose=verbose,
1288                        abs_tol=abs_tol,  # suggested 1e-4, for n_iter = 200
1289                        ucb_tol=ucb_tol,
1290                        min_budget=min_budget,  # minimum budget for early stopping
1291                        func_args=func_args,
1292                    )
1293
1294                    score_next_param = gp_opt_obj.y_min
1295
1296                    df_res.iloc[i, 1] = score_next_param
1297
1298                    if score_next_param < self.y_min:
1299                        self.x_min = gp_opt_obj.x_min
1300                        self.y_min = score_next_param
1301                        self.best_surrogate = copy.deepcopy(
1302                            gp_opt_obj.surrogate_obj
1303                        )
1304                        if self.y_min == self.min_value:
1305                            break
1306
1307                    if verbose == 2:
1308                        print(f"Global iteration #{i + 1} -----")
1309                        print(f"current minimum:  {self.x_min}")
1310                        print(f"current minimum score:  {self.y_min}")
1311                        print(
1312                            f"score for next parameter: {score_next_param} \n"
1313                        )
1314
1315                    # except ValueError:
1316
1317                    #    continue
1318
1319            elif self.n_jobs >= 2 or self.n_jobs == -1:  # parallel optimization
1320
1321                def foo(i):
1322                    df_res["Model"][i + 1] = self.regressors[i][0]
1323
1324                    gp_opt_obj = GPOpt(
1325                        objective_func=self.objective_func,
1326                        lower_bound=self.lower_bound,
1327                        upper_bound=self.upper_bound,
1328                        n_init=self.n_init,
1329                        n_iter=self.n_iter,
1330                        alpha=self.alpha,
1331                        n_restarts_optimizer=self.n_restarts_optimizer,
1332                        seed=self.seed,
1333                        n_jobs=self.n_jobs,
1334                        acquisition=self.acquisition,
1335                        method=self.method,
1336                        min_value=self.min_value,
1337                        surrogate_obj=copy.deepcopy(self.regressors[i][1]),
1338                    )
1339
1340                    try:
1341                        gp_opt_obj.optimize(
1342                            verbose=0,  # important
1343                            abs_tol=abs_tol,  # suggested 1e-4, for n_iter = 200
1344                            ucb_tol=ucb_tol,
1345                            min_budget=min_budget,  # minimum budget for early stopping
1346                            func_args=func_args,
1347                        )
1348
1349                        return gp_opt_obj
1350                    except ValueError:
1351                        return None
1352
1353                tmp_results = Parallel(n_jobs=self.n_jobs)(
1354                    delayed(foo)(i) for i in tqdm(range(len(self.regressors)))
1355                )
1356
1357                for i in tqdm(range(len(tmp_results))):
1358                    if tmp_results[i] is not None:
1359                        gp_opt_obj = copy.deepcopy(tmp_results[i])
1360
1361                        score_next_param = gp_opt_obj.y_min
1362
1363                        df_res.iloc[i, 1] = score_next_param
1364
1365                        if score_next_param < self.y_min:
1366                            self.x_min = gp_opt_obj.x_min
1367                            self.y_min = score_next_param
1368                            self.best_surrogate = copy.deepcopy(
1369                                gp_opt_obj.surrogate_obj
1370                            )
1371                            if self.y_min == self.min_value:
1372                                break
1373
1374                        if verbose == 2:
1375                            print(f"Global iteration #{i + 1} -----")
1376                            print(f"current minimum:  {self.x_min}")
1377                            print(f"current minimum score:  {self.y_min}")
1378                            print(
1379                                f"score for next parameter: {score_next_param} \n"
1380                            )
1381
1382                    else:
1383                        continue
1384
1385            else:
1386                raise ValueError(
1387                    "n_jobs must be either None or >= 2 or equal to -1"
1388                )
1389            return DescribeResult(
1390                self.x_min,
1391                self.y_min,
1392                self.best_surrogate,
1393                df_res.sort_values(by="Score"),
1394            )
1395
1396        else:
1397            NotImplementedError(
1398                "type_exec must be either 'queue' or 'independent'"
1399            )

Class GPOpt.

Arguments:

lower_bound: a numpy array;
    lower bound for researched minimum

upper_bound: a numpy array;
    upper bound for researched minimum

objective_func: a function;
    the objective function to be minimized

params_names: a list;
    names of the parameters of the objective function (optional)

surrogate_obj: an object;
    An ML model for estimating the uncertainty around the objective function
    Must be nnetsauce.CustomRegressor or nnetsauce.PredictionInterval

x_init:
    initial setting of points where `objective_func` is evaluated (optional)

y_init:
    initial setting values at points where `objective_func` is evaluated (optional)

n_init: an integer;
    number of points in the initial setting, when `x_init` and `y_init` are not provided

n_choices: an integer;
    number of points for the calculation of expected improvement

n_iter: an integer;
    number of iterations of the minimization algorithm

alpha: a float;
    Value added to the diagonal of the kernel matrix during fitting (for Matern 5/2 kernel)

n_restarts_optimizer: an integer;
    The number of restarts of the optimizer for finding the kernel’s parameters which maximize the log-marginal likelihood.

seed: an integer;
    reproducibility seed

save: a string;
    Specifies where to save the optimizer in its current state

n_jobs: an integer;
    number of jobs for parallel computing on initial setting or method `lazyoptimize` (can be -1)

acquisition: a string;
    acquisition function: "ei" (expected improvement) or "ucb" (upper confidence bound)

method: an str;
    "bayesian" (default) for Gaussian posteriors, "mc" for Monte Carlo posteriors,
    "splitconformal" (only for acquisition = "ucb") for conformalized surrogates

min_value: a float;
    minimum value of the objective function (default is None). For example,
    if objective function is accuracy, will be 1, and the algorithm will stop

per_second: a boolean;
    __experimental__, default is False (leave to default for now)

log_scale: a boolean;
    __experimental__, default is False (leave to default for now)

see also Bayesian Optimization with GPopt or Hyperparameters tuning with GPopt Agnostic BayesOpt

class BOstopping:
 22class BOstopping:
 23    """Bayesian Optimization with robust early stopping criteria."""
 24
 25    def __init__(
 26        self,
 27        f,
 28        bounds,
 29        n_init=5,
 30        kappa=1.96,
 31        early_stopping=True,
 32        stop_patience=20,
 33        stop_threshold=0.01,
 34        n_test_points=100,
 35        alpha=1e-6,
 36        n_restarts_optimizer=25,
 37        seed=123,
 38    ):
 39        self.f = f
 40        self.bounds = np.array(bounds)
 41        self.n_init = n_init
 42        self.kappa = kappa
 43        self.early_stopping = early_stopping
 44        self.stop_patience = stop_patience
 45        self.stop_threshold = stop_threshold
 46        self.n_test_points = n_test_points
 47        self.alpha = alpha
 48        self.n_restarts_optimizer = n_restarts_optimizer
 49        self.seed = seed
 50
 51        np.random.seed(self.seed)
 52        self.test_points = self._sample_random(n_test_points)
 53
 54        # History tracking
 55        self.wasserstein_history = []
 56        self.X = []
 57        self.y = []
 58        self.best_values = []
 59        self.acquisition_values = []
 60        self.gp_variance = []
 61        self.phase = []
 62
 63        # GP setup
 64        self.gp = GaussianProcessRegressor(
 65            kernel=Matern(nu=2.5),
 66            alpha=self.alpha,
 67            normalize_y=True,
 68            n_restarts_optimizer=self.n_restarts_optimizer,
 69            random_state=self.seed,
 70        )
 71
 72    def _sample_random(self, n_samples):
 73        """Uniform sampling within bounds."""
 74        return np.random.uniform(
 75            self.bounds[:, 0],
 76            self.bounds[:, 1],
 77            size=(n_samples, len(self.bounds)),
 78        )
 79
 80    def _acquisition(self, X_candidate):
 81        """Expected Improvement acquisition function."""
 82        mu, sigma = self.gp.predict(X_candidate, return_std=True)
 83        mu_sample = np.min(self.y)  # Use actual observed minimum
 84
 85        sigma = np.maximum(sigma, 1e-6)
 86        gamma = (mu_sample - mu) / sigma
 87        ei = (mu_sample - mu) * norm.cdf(gamma) + sigma * norm.pdf(gamma)
 88        return ei
 89
 90    def _get_posterior_samples(self, gp, n_samples=100):
 91        """Sample from GP posterior at test points."""
 92        mu, sigma = gp.predict(self.test_points, return_std=True)
 93        return np.random.normal(
 94            mu, sigma, size=(n_samples, len(self.test_points))
 95        )
 96
 97    def _compute_wasserstein(self, gp_prev, gp_current):
 98        """Compute approximate Wasserstein distance between posteriors."""
 99        mu_prev, std_prev = gp_prev.predict(self.test_points, return_std=True)
100        mu_curr, std_curr = gp_current.predict(
101            self.test_points, return_std=True
102        )
103
104        # 2-Wasserstein distance for independent 1D Gaussians, averaged
105        w2_per_point = (mu_prev - mu_curr) ** 2 + (std_prev - std_curr) ** 2
106        return np.sqrt(np.mean(w2_per_point))
107
108    def _should_stop(self, gp_prev):
109        """Early stopping based on improvement and posterior stability."""
110        if len(self.best_values) < self.stop_patience + 1:
111            return False
112
113        # 1. Improvement check
114        recent_improvements = np.diff(self.best_values[-self.stop_patience :])
115        improvement_stop = np.all(
116            np.abs(recent_improvements) < self.stop_threshold
117        )
118
119        # 2. Posterior stability
120        current_w = self._compute_wasserstein(gp_prev, self.gp)
121        self.wasserstein_history.append(current_w)
122
123        if len(self.wasserstein_history) >= 2 * self.stop_patience:
124            recent_w = self.wasserstein_history[-self.stop_patience :]
125            older_w = self.wasserstein_history[
126                -2 * self.stop_patience : -self.stop_patience
127            ]
128            _, p_value = mannwhitneyu(recent_w, older_w, alternative="greater")
129            mwu_stable = p_value > 0.1
130            var_stable = np.var(recent_w) < 1e-6
131            posterior_stable = mwu_stable or var_stable
132        else:
133            posterior_stable = False
134
135        return improvement_stop or posterior_stable
136
137    def optimize(self, n_iter=100):
138        """Run Bayesian optimization loop."""
139        print("Starting Initial Design Phase...")
140        self.X = self._sample_random(self.n_init)
141        self.y = []
142
143        for i, x in enumerate(self.X):
144            y_val = self.f(x)
145            self.y.append(y_val)
146            current_best = np.min(self.y)
147            self.best_values.append(current_best)
148            self.acquisition_values.append(0)
149            self.gp_variance.append(0)
150            self.phase.append("initial")
151            print(
152                f"  Initial sample {i+1}/{self.n_init}: f(x) = {y_val:.6f}, best = {current_best:.6f}"
153            )
154
155        print(f"Initial Design Complete. Best value: {np.min(self.y):.6f}")
156        print("\nStarting Bayesian Optimization Phase...")
157
158        gp_prev = None
159        for i in tqdm(range(n_iter), desc="Bayesian Optimization"):
160            if i > 0:
161                gp_prev = deepcopy(self.gp)
162
163            self.gp.fit(self.X, self.y)
164
165            X_candidate = self._sample_random(1000)
166            acq = self._acquisition(X_candidate)
167            best_acq_idx = np.argmax(acq)
168            x_next = X_candidate[best_acq_idx]
169            max_acq_value = acq[best_acq_idx]
170
171            _, gp_std = self.gp.predict([x_next], return_std=True)
172
173            y_next = self.f(x_next)
174            self.X = np.vstack((self.X, x_next))
175            self.y.append(y_next)
176            current_best = np.min(self.y)
177            self.best_values.append(current_best)
178            self.acquisition_values.append(max_acq_value)
179            self.gp_variance.append(gp_std[0])
180            self.phase.append("bayesian")
181
182            print(
183                f"  Iteration {i+1}: f(x) = {y_next:.6f}, best = {current_best:.6f}, "
184                f"EI = {max_acq_value:.6f}, σ = {gp_std[0]:.6f}"
185            )
186
187            if gp_prev is not None:
188                w_dist = self._compute_wasserstein(gp_prev, self.gp)
189                self.wasserstein_history.append(w_dist)
190                print(f"    Wasserstein distance: {w_dist:.8f}")
191
192            if self.early_stopping and i > self.n_init and gp_prev is not None:
193                if self._should_stop(gp_prev):
194                    print(f"Early stopping at iteration {i+1}")
195                    break
196
197        best_idx = np.argmin(self.y)
198        return self.X[best_idx], self.y[best_idx]

Bayesian Optimization with robust early stopping criteria.

class MLOptimizer:
  9class MLOptimizer:
 10    """
 11    A class for hyperparameter optimization of any ML model using Gaussian Process optimization.
 12    """
 13
 14    def __init__(
 15        self,
 16        scoring="accuracy",
 17        cv=5,
 18        n_jobs=None,
 19        n_init=10,
 20        n_iter=190,
 21        seed=3137,
 22    ):
 23        """
 24        Initialize the hyperparameter optimizer.
 25
 26        Parameters:
 27        -----------
 28        scoring : str or callable
 29            Scoring metric for cross-validation
 30        cv : int
 31            Number of cross-validation folds
 32        n_jobs : int
 33            Number of parallel jobs
 34        n_init : int
 35            Number of initial random evaluations
 36        n_iter : int
 37            Number of optimization iterations
 38        seed : int
 39            Random seed
 40        """
 41        self.scoring = scoring
 42        self.cv = cv
 43        self.n_jobs = n_jobs
 44        self.n_init = n_init
 45        self.n_iter = n_iter
 46        self.seed = seed
 47
 48        # Store optimization state
 49        self.optimization_result = None
 50        self.estimator_class = None
 51        self.param_config = None
 52        self.X_train = None
 53        self.y_train = None
 54
 55    def _generic_cv(self, estimator_class, param_dict):
 56        """
 57        Generic cross-validation function for any ML model.
 58
 59        Parameters:
 60        -----------
 61        estimator_class : class
 62            The ML model class (e.g., RandomForestClassifier, SVR, etc.)
 63        param_dict : dict
 64            Dictionary of parameters to pass to the estimator
 65
 66        Returns:
 67        --------
 68        float : negative mean CV score (for minimization)
 69        """
 70        try:
 71            estimator = estimator_class.set_params(**param_dict)
 72            scores = cross_val_score(
 73                estimator,
 74                self.X_train,
 75                self.y_train,
 76                scoring=self.scoring,
 77                cv=self.cv,
 78                n_jobs=self.n_jobs,
 79                error_score="raise",  # This will raise an error if CV fails
 80            )
 81            return -scores.mean()
 82        except Exception as e:
 83            # Return a large positive value if CV fails
 84            print(f"CV failed with parameters {param_dict}: {e}")
 85            return 1e6  # Large positive value for minimization
 86
 87    def optimize(
 88        self, X_train, y_train, estimator_class, param_config, verbose=2
 89    ):
 90        """
 91        Optimize hyperparameters for an ML model.
 92
 93        Parameters:
 94        -----------
 95        X_train : array-like
 96            Training features
 97        y_train : array-like
 98            Training targets
 99        estimator_class : class
100            The ML model class (not an instance!)
101        param_config : dict
102            Configuration for each parameter
103        verbose : int
104            Verbosity level
105
106        Returns:
107        --------
108        GPOpt result object
109        """
110        # Store optimization context
111        self.X_train = X_train
112        self.y_train = y_train
113        self.estimator_class = estimator_class
114        self.param_config = param_config
115
116        # Extract parameter names, bounds, and transformations
117        param_names = list(param_config.keys())
118        lower_bounds = []
119        upper_bounds = []
120        transforms = {}
121        dtypes = {}
122        defaults = {}
123
124        for param_name, config in param_config.items():
125            lower_bounds.append(config["bounds"][0])
126            upper_bounds.append(config["bounds"][1])
127            if "transform" in config:
128                transforms[param_name] = config["transform"]
129            dtypes[param_name] = config.get("dtype", "float")
130            if "default" in config:
131                defaults[param_name] = config["default"]
132
133        def crossval_objective(x):
134            """Objective function for hyperparameter tuning"""
135            param_dict = {}
136
137            for i, param_name in enumerate(param_names):
138                value = x[i]
139
140                # Apply transformation if specified
141                if param_name in transforms:
142                    value = transforms[param_name](value)
143
144                # Apply dtype conversion
145                if dtypes[param_name] == "int":
146                    value = int(
147                        np.round(value)
148                    )  # Use round instead of ceil for better behavior
149
150                param_dict[param_name] = value
151
152            # Add any default parameters not being optimized
153            for param_name, default_value in defaults.items():
154                if param_name not in param_dict:
155                    param_dict[param_name] = default_value
156
157            # Remove None values as they can cause issues
158            param_dict = {k: v for k, v in param_dict.items() if v is not None}
159
160            return self._generic_cv(estimator_class, param_dict)
161
162        # Set up Gaussian Process optimization
163        gp_opt = GPOpt(
164            objective_func=crossval_objective,
165            lower_bound=np.array(lower_bounds),
166            upper_bound=np.array(upper_bounds),
167            params_names=param_names,
168            n_init=self.n_init,
169            n_iter=self.n_iter,
170            seed=self.seed,
171        )
172
173        try:
174            self.optimization_result = gp_opt.optimize(
175                verbose=verbose, abs_tol=1e-4
176            )
177            return self.optimization_result
178        except Exception as e:
179            print(f"Optimization failed: {e}")
180            # Try to debug by running a few evaluations manually
181            print(
182                "Debugging: Testing objective function with initial points..."
183            )
184            for i in range(min(3, self.n_init)):
185                test_point = np.random.uniform(lower_bounds, upper_bounds)
186                score = crossval_objective(test_point)
187                print(f"Test point {i}: {test_point} -> score: {score}")
188            raise
189
190    def get_best_parameters(self, apply_transforms=True):
191        """
192        Get the best parameters found during optimization.
193        """
194        if self.optimization_result is None:
195            raise ValueError(
196                "No optimization has been run yet. Call optimize() first."
197            )
198
199        if apply_transforms:
200            return self.optimization_result.best_params
201        else:
202            return self.optimization_result.x
203
204    def get_best_score(self):
205        """
206        Get the best score found during optimization.
207        """
208        if self.optimization_result is None:
209            raise ValueError(
210                "No optimization has been run yet. Call optimize() first."
211            )
212
213        return (
214            -self.optimization_result.best_score
215        )  # Convert back to positive score
216
217    def create_optimized_estimator(self):
218        """
219        Create an estimator instance with the optimized parameters.
220        """
221        if self.optimization_result is None:
222            raise ValueError(
223                "No optimization has been run yet. Call optimize() first."
224            )
225
226        if self.estimator_class is None:
227            raise ValueError(
228                "No estimator class stored. Call optimize() first."
229            )
230
231        if self.param_config is None:
232            raise ValueError(
233                "No parameter configuration stored. Call optimize() first."
234            )
235
236        best_params = self.get_best_parameters()
237
238        # Apply transformations and type conversions using stored param_config
239        final_params = {}
240        for param_name, value in best_params.items():
241            if param_name in self.param_config:
242                config = self.param_config[param_name]
243
244                # Apply transformation if specified
245                if "transform" in config:
246                    value = config["transform"](value)
247
248                # Apply dtype conversion
249                if config.get("dtype") == "int":
250                    value = int(np.round(value))
251
252            final_params[param_name] = value
253
254        # Remove None values
255        final_params = {k: v for k, v in final_params.items() if v is not None}
256
257        return self.estimator_class.set_params(**final_params)
258
259    def fit_optimized_estimator(self):
260        """
261        Create and fit an optimized estimator on the training data.
262        """
263        if self.X_train is None or self.y_train is None:
264            raise ValueError("No training data stored. Call optimize() first.")
265
266        estimator = self.create_optimized_estimator()
267        return estimator.fit(self.X_train, self.y_train)

A class for hyperparameter optimization of any ML model using Gaussian Process optimization.

class GeneralizationOpt:
156class GeneralizationOpt:
157    """
158    Generic framework for model generalization diagnostics
159
160    This class is agnostic to:
161    - The base model being tuned (LightGBM, XGBoost, neural nets, etc.)
162    - The surrogate model used (GP, RF, neural nets, etc.)
163    - The hyperparameter space structure
164
165    Example usage:
166    --------------
167    >>> from sklearn.gaussian_process import GaussianProcessRegressor
168    >>> from sklearn.ensemble import RandomForestRegressor
169    >>>
170    >>> # Define hyperparameter space
171    >>> hyperparams = {
172    >>>     'learning_rate': (0.01, 0.3, True),  # (min, max, log_scale)
173    >>>     'max_depth': (3, 10, False)
174    >>> }
175    >>>
176    >>> # Initialize framework
177    >>> diag = GeneralizationOpt(hyperparams, random_state=42)
178    >>>
179    >>> # Use GP surrogate with Bayesian uncertainty
180    >>> gp = GaussianProcessRegressor()
181    >>> surrogate = GenericSurrogate(gp, conformal=False, bayesian=True)
182    >>>
183    >>> # Or use RF with conformal prediction
184    >>> rf = RandomForestRegressor()
185    >>> surrogate = GenericSurrogate(rf, conformal=True, bayesian=False)
186    >>>
187    >>> # Run diagnostics
188    >>> results = diag.run_diagnostics(
189    >>>     X_train, X_test, y_train, y_test,
190    >>>     model_class=LGBMClassifier,
191    >>>     surrogate=surrogate,
192    >>>     n_samples=100
193    >>> )
194    """
195
196    def __init__(self, hyperparams, random_state=42):
197        """
198        Initialize diagnostics framework
199
200        Parameters:
201        -----------
202        hyperparams : dict
203            Hyperparameter space definition
204            Format: {param_name: (min_val, max_val, log_scale), ...}
205        random_state : int
206            Random seed for reproducibility
207        """
208        self.hyperparams = hyperparams
209        self.hyperparam_names = list(hyperparams.keys())
210        self.n_dims = len(self.hyperparam_names)
211        self.random_state = random_state
212
213        # Storage
214        self.sobol_samples = None
215        self.evaluation_results = None
216        self.surrogate = None
217        self.sobol_indices = None
218
219    def _denormalize_config(self, normalized_config):
220        """Convert normalized [0,1] to actual hyperparameter values"""
221        config = {}
222        for i, param_name in enumerate(self.hyperparam_names):
223            min_val, max_val, log_scale = self.hyperparams[param_name]
224            norm_val = normalized_config[i]
225
226            if log_scale:
227                log_min, log_max = np.log(min_val), np.log(max_val)
228                config[param_name] = np.exp(
229                    log_min + norm_val * (log_max - log_min)
230                )
231            else:
232                denormalized_value = min_val + norm_val * (max_val - min_val)
233                # Special handling for boolean parameters represented as (0, 1, False)
234                if (
235                    min_val == 0
236                    and max_val == 1
237                    and not log_scale
238                    and isinstance(min_val, int)
239                ):
240                    config[param_name] = bool(round(denormalized_value))
241                # Convert to int if needed (if min and max are integers and not log-scale)
242                elif (
243                    isinstance(min_val, int)
244                    and isinstance(max_val, int)
245                    and not log_scale
246                ):
247                    config[param_name] = int(round(denormalized_value))
248                else:
249                    config[param_name] = denormalized_value
250
251        return config
252
253    def generate_sobol_samples(self, n_samples=None):
254        """
255        Generate Sobol quasi-random samples in hyperparameter space
256
257        Parameters:
258        -----------
259        n_samples : int, optional
260            Number of samples (default: 75 * n_dims)
261
262        Returns:
263        --------
264        samples : ndarray, shape (n_samples, n_dims)
265            Normalized samples in [0, 1]^n_dims
266        """
267        if n_samples is None:
268            n_samples = 75 * self.n_dims
269
270        sobol = Sobol(d=self.n_dims, scramble=True, seed=self.random_state)
271        self.sobol_samples = sobol.random(n_samples)
272
273        return self.sobol_samples
274
275    def evaluate_configurations(
276        self,
277        X_train,
278        X_test,
279        y_train,
280        y_test,
281        model_class,
282        model_kwargs=None,
283        evaluation_fn=None,
284        n_folds=5,
285    ):
286        """
287        Evaluate model configurations and compute generalization gaps
288
289        Parameters:
290        -----------
291        X_train, X_test, y_train, y_test : arrays
292            Train/test data
293        model_class : class
294            Model class to instantiate (e.g., LGBMClassifier)
295        model_kwargs : dict, optional
296            Fixed kwargs for model (e.g., {'random_state': 42, 'verbose': -1})
297        evaluation_fn : callable, optional
298            Function to evaluate model: evaluation_fn(y_true, y_pred) -> score
299            Default: balanced_accuracy_score for classification
300        n_folds : int
301            Number of CV folds for training score estimation
302
303        Returns:
304        --------
305        results : DataFrame
306            Results with columns: hyperparams, cv_train, test_score, gap_abs, etc.
307        """
308        from sklearn.model_selection import StratifiedKFold, KFold
309        from sklearn.metrics import balanced_accuracy_score, r2_score
310
311        if self.sobol_samples is None:
312            raise ValueError("Must generate Sobol samples first")
313
314        if model_kwargs is None:
315            model_kwargs = {}
316
317        if evaluation_fn is None:
318            # Heuristic: check if classification or regression
319            if len(np.unique(y_train)) < 20:
320                evaluation_fn = balanced_accuracy_score
321                kfold = StratifiedKFold(
322                    n_splits=n_folds,
323                    shuffle=True,
324                    random_state=self.random_state,
325                )
326            else:
327                evaluation_fn = r2_score
328                kfold = KFold(
329                    n_splits=n_folds,
330                    shuffle=True,
331                    random_state=self.random_state,
332                )
333        else:
334            kfold = KFold(
335                n_splits=n_folds, shuffle=True, random_state=self.random_state
336            )
337
338        results = []
339        n_samples = len(self.sobol_samples)
340
341        print(
342            f"Evaluating {n_samples} configurations ({n_folds}-fold CV + test)..."
343        )
344
345        for idx, norm_config in tqdm(enumerate(self.sobol_samples)):
346            if (idx + 1) % 50 == 0:
347                print(
348                    f"  Progress: {idx + 1}/{n_samples} "
349                    f"({100*(idx+1)/n_samples:.1f}%)"
350                )
351
352            config = self._denormalize_config(norm_config)
353
354            # Cross-validation score
355            cv_scores = []
356            for train_idx, val_idx in kfold.split(X_train, y_train):
357                if hasattr(X_train, "iloc"):  # DataFrame
358                    X_tr, X_val = X_train.iloc[train_idx], X_train.iloc[val_idx]
359                    y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
360                else:  # ndarray
361                    X_tr, X_val = X_train[train_idx], X_train[val_idx]
362                    y_tr, y_val = y_train[train_idx], y_train[val_idx]
363
364                model = model_class(**config, **model_kwargs)
365                model.fit(X_tr, y_tr)
366                y_pred = model.predict(X_val)
367                cv_scores.append(evaluation_fn(y_val, y_pred))
368
369            cv_train = np.mean(cv_scores)
370
371            # Test score
372            model = model_class(**config, **model_kwargs)
373            model.fit(X_train, y_train)
374            y_pred_test = model.predict(X_test)
375            test_score = evaluation_fn(y_test, y_pred_test)
376
377            # Generalization gaps
378            gap_abs = cv_train - test_score
379            gap_norm = (
380                (cv_train - test_score) / (1 - cv_train) if cv_train < 1 else 0
381            )
382            gap_mpe = (
383                ((cv_train - test_score) / cv_train * 100)
384                if cv_train > 0
385                else 0
386            )
387
388            results.append(
389                {
390                    **config,
391                    "cv_train": cv_train,
392                    "test_score": test_score,
393                    "gap_abs": gap_abs,
394                    "gap_norm": gap_norm,
395                    "gap_mpe": gap_mpe,
396                }
397            )
398
399        self.evaluation_results = pd.DataFrame(results)
400
401        print(f"\n✓ Completed {n_samples} evaluations")
402        print(
403            f"  CV Train Score: {self.evaluation_results['cv_train'].mean():.4f} \u00b1 "
404            f"{self.evaluation_results['cv_train'].std():.4f}"
405        )
406        print(
407            f"  Test Score: {self.evaluation_results['test_score'].mean():.4f} \u00b1 "
408            f"{self.evaluation_results['test_score'].std():.4f}"
409        )
410        print(
411            f"  Absolute Gap: {self.evaluation_results['gap_abs'].mean():.4f} \u00b1 "
412            f"{self.evaluation_results['gap_abs'].std():.4f}"
413        )
414
415        return self.evaluation_results
416
417    def fit_surrogate(self, surrogate, target_gap="gap_abs"):
418        """
419        Fit surrogate model to generalization gap data
420
421        Parameters:
422        -----------
423        surrogate : GenericSurrogate
424            Surrogate model instance
425        target_gap : str
426            Target variable to model ('gap_abs', 'gap_norm', 'gap_mpe')
427
428        Returns:
429        --------
430        surrogate : GenericSurrogate
431            Fitted surrogate model
432        metrics : dict
433            Performance metrics (R², MAE, coverage if applicable)
434        """
435        if self.evaluation_results is None:
436            raise ValueError("Must evaluate configurations first")
437
438        X = self.sobol_samples
439        y = self.evaluation_results[target_gap].values
440
441        # Train/validation split
442        split_idx = int(0.8 * len(X))
443        X_train, X_val = X[:split_idx], X[split_idx:]
444        y_train, y_val = y[:split_idx], y[split_idx:]
445
446        print(
447            f"Training surrogate on {len(X_train)} samples, "
448            f"validating on {len(X_val)} samples..."
449        )
450
451        # Fit surrogate
452        surrogate.fit(X_train, y_train)
453
454        # Evaluate
455        y_pred, y_lower, y_upper = surrogate.predict(X_val, return_std=True)
456
457        from sklearn.metrics import r2_score, mean_absolute_error
458
459        r2 = r2_score(y_val, y_pred)
460        mae = mean_absolute_error(y_val, y_pred)
461
462        metrics = {"r2": r2, "mae": mae}
463
464        # Compute coverage if intervals available
465        if not np.array_equal(y_lower, y_upper):
466            coverage = np.mean((y_val >= y_lower) & (y_val <= y_upper))
467            interval_width = np.mean(y_upper - y_lower)
468            metrics["coverage"] = coverage
469            metrics["interval_width"] = interval_width
470
471            print(f"\n✓ Surrogate Performance:")
472            print(f"  R² Score: {r2:.4f}")
473            print(f"  MAE: {mae:.6f}")
474            print(f"  Coverage: {coverage:.2%}")
475            print(f"  Mean Interval Width: {interval_width:.6f}")
476        else:
477            print(f"\n✓ Surrogate Performance:")
478            print(f"  R² Score: {r2:.4f}")
479            print(f"  MAE: {mae:.6f}")
480
481        self.surrogate = surrogate
482        return surrogate, metrics
483
484    def compute_sobol_indices(self, n_samples=10000):
485        """
486        Compute Sobol sensitivity indices for global sensitivity analysis
487
488        Parameters:
489        -----------
490        n_samples : int
491            Number of Monte Carlo samples for Sobol estimation
492
493        Returns:
494        --------
495        indices : DataFrame
496            Sobol indices with columns: Parameter, S1, ST, Interaction
497        """
498        if self.surrogate is None:
499            raise ValueError("Must fit surrogate first")
500
501        problem = {
502            "num_vars": self.n_dims,
503            "names": self.hyperparam_names,
504            "bounds": [[0, 1]] * self.n_dims,
505        }
506
507        param_values = sobol_sample.sample(
508            problem, n_samples, calc_second_order=False
509        )
510        Y = self.surrogate.predict(param_values)
511
512        Si = sobol_analyze.analyze(
513            problem, Y, calc_second_order=False, print_to_console=False
514        )
515
516        self.sobol_indices = pd.DataFrame(
517            {
518                "Parameter": self.hyperparam_names,
519                "S1": Si["S1"],
520                "ST": Si["ST"],
521                "Interaction": Si["ST"] - Si["S1"],
522            }
523        ).sort_values("S1", ascending=False)
524
525        print("\n" + "=" * 70)
526        print("Sobol Sensitivity Indices (Variance Decomposition)")
527        print("=" * 70)
528        print(self.sobol_indices.to_string(index=False))
529        print("\nS1 = First-order effect (main effect)")
530        print("ST = Total-order effect (main + interactions)")
531        print("Interaction = ST - S1 (pure interaction effects)")
532
533        return self.sobol_indices
534
535    def optimize_acquisition(
536        self,
537        acquisition="ei",
538        n_starts=10,
539        kappa=2.0,
540        xi=0.01,
541        minimize_gap=True,
542    ):
543        """
544        Optimize acquisition function to find best hyperparameter configuration
545
546        Parameters:
547        -----------
548        acquisition : str
549            Acquisition function: 'ei' (Expected Improvement),
550            'ucb' (Upper Confidence Bound), 'pi' (Probability of Improvement)
551        n_starts : int
552            Number of random restarts for multi-start optimization
553        kappa : float
554            Exploration parameter for UCB (higher = more exploration)
555        xi : float
556            Exploration parameter for EI/PI (higher = more exploration)
557        minimize_gap : bool
558            If True, minimize gap; if False, maximize test performance
559
560        Returns:
561        --------
562        results : dict
563            Optimization results including best config and expected improvement
564        """
565        if self.surrogate is None:
566            raise ValueError("Must fit surrogate first")
567
568        # Get current best
569        if minimize_gap:
570            y_best = self.evaluation_results["gap_abs"].min()
571            print(f"Current best gap: {y_best:.6f}")
572        else:
573            y_best = self.evaluation_results["test_score"].max()
574            print(f"Current best test score: {y_best:.6f}")
575
576        # Define acquisition functions
577        def expected_improvement(x):
578            x = x.reshape(1, -1)
579            y_pred, y_lower, y_upper = self.surrogate.predict(
580                x, return_std=True
581            )
582            mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4  # Approx std
583
584            if sigma < 1e-10:
585                return 0.0
586
587            if minimize_gap:
588                imp = y_best - mu - xi
589            else:
590                imp = mu - y_best - xi
591
592            Z = imp / sigma
593            ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
594            return -ei
595
596        def upper_confidence_bound(x):
597            x = x.reshape(1, -1)
598            y_pred, y_lower, y_upper = self.surrogate.predict(
599                x, return_std=True
600            )
601            mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4
602
603            if minimize_gap:
604                ucb = mu - kappa * sigma
605            else:
606                ucb = mu + kappa * sigma
607
608            return ucb if minimize_gap else -ucb
609
610        def probability_of_improvement(x):
611            x = x.reshape(1, -1)
612            y_pred, y_lower, y_upper = self.surrogate.predict(
613                x, return_std=True
614            )
615            mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4
616
617            if sigma < 1e-10:
618                return 0.0
619
620            if minimize_gap:
621                Z = (y_best - mu - xi) / sigma
622            else:
623                Z = (mu - y_best - xi) / sigma
624
625            pi = norm.cdf(Z)
626            return -pi
627
628        acq_funcs = {
629            "ei": expected_improvement,
630            "ucb": upper_confidence_bound,
631            "pi": probability_of_improvement,
632        }
633        acq_func = acq_funcs[acquisition]
634
635        # Multi-start optimization
636        print(f"\nRunning {n_starts}-start L-BFGS-B optimization...")
637
638        best_result = None
639        best_acq_value = np.inf
640
641        np.random.seed(self.random_state)
642        starting_points = np.random.rand(n_starts, self.n_dims)
643
644        # Add best observed point as starting point
645        if len(self.sobol_samples) > 0:
646            best_idx = (
647                self.evaluation_results["gap_abs"].idxmin()
648                if minimize_gap
649                else self.evaluation_results["test_score"].idxmax()
650            )
651            starting_points[0] = self.sobol_samples[best_idx]
652
653        for i, x0 in enumerate(starting_points):
654            try:
655                result = scipy_minimize(
656                    acq_func,
657                    x0,
658                    method="L-BFGS-B",
659                    bounds=[(0, 1)] * self.n_dims,
660                    options={"maxiter": 1000, "ftol": 1e-9},
661                )
662
663                if result.success and result.fun < best_acq_value:
664                    best_acq_value = result.fun
665                    best_result = result
666            except Exception as e:
667                print(f"  Warning: Optimization {i+1} failed: {str(e)}")
668
669        if best_result is None:
670            raise RuntimeError("All optimization attempts failed")
671
672        # Extract best configuration
673        best_x_normalized = best_result.x
674        best_config = self._denormalize_config(best_x_normalized)
675
676        # Predict gap at optimum
677        y_pred, y_lower, y_upper = self.surrogate.predict(
678            best_x_normalized.reshape(1, -1), return_std=True
679        )
680        mu_best = y_pred[0]
681
682        print(f"\n✓ Optimization Complete")
683        print(f"  Acquisition: {acquisition.upper()}")
684        print(f"  Best acquisition value: {-best_acq_value:.6f}")
685        print(f"  Predicted gap at optimum: {mu_best:.6f}")
686        print(f"  95% CI: [{y_lower[0]:.6f}, {y_upper[0]:.6f}]")
687        print(f"\n  Recommended Configuration:")
688        for param, value in best_config.items():
689            print(
690                f"    {param}: {value if isinstance(value, int) else f'{value:.4f}'}"
691            )
692
693        improvement = y_best - mu_best if minimize_gap else mu_best - y_best
694        print(f"\n  Expected improvement: {improvement:.6f}")
695
696        return {
697            "best_config": best_config,
698            "best_config_normalized": best_x_normalized,
699            "predicted_gap_mean": mu_best,
700            "predicted_gap_lower": y_lower[0],
701            "predicted_gap_upper": y_upper[0],
702            "current_best": y_best,
703            "expected_improvement": improvement,
704            "acquisition_function": acquisition,
705        }
706
707    def run_diagnostics(
708        self,
709        X_train,
710        X_test,
711        y_train,
712        y_test,
713        model_class,
714        surrogate,
715        n_samples=None,
716        model_kwargs=None,
717        target_gap="gap_abs",
718    ):
719        """
720        Run complete diagnostic pipeline
721
722        Parameters:
723        -----------
724        X_train, X_test, y_train, y_test : arrays
725            Train/test data
726        model_class : class
727            Model class to tune
728        surrogate : GenericSurrogate
729            Surrogate model for gap modeling
730        n_samples : int, optional
731            Number of Sobol samples
732        model_kwargs : dict, optional
733            Fixed model kwargs
734        target_gap : str
735            Gap metric to model
736
737        Returns:
738        --------
739        results : dict
740            Complete diagnostic results
741        """
742        # Phase 1: Exploration
743        self.generate_sobol_samples(n_samples)
744        eval_results = self.evaluate_configurations(
745            X_train, X_test, y_train, y_test, model_class, model_kwargs
746        )
747
748        # Phase 2: Surrogate fitting
749        surrogate, metrics = self.fit_surrogate(surrogate, target_gap)
750
751        # Phase 3: Analysis
752        sobol_indices = self.compute_sobol_indices()
753
754        # Phase 4: Optimization
755        opt_results = self.optimize_acquisition()
756
757        return {
758            "evaluation_results": eval_results,
759            "surrogate_metrics": metrics,
760            "sobol_indices": sobol_indices,
761            "optimization": opt_results,
762        }

Generic framework for model generalization diagnostics

This class is agnostic to:

  • The base model being tuned (LightGBM, XGBoost, neural nets, etc.)
  • The surrogate model used (GP, RF, neural nets, etc.)
  • The hyperparameter space structure

Example usage:

>>> from sklearn.gaussian_process import GaussianProcessRegressor
>>> from sklearn.ensemble import RandomForestRegressor
>>>
>>> # Define hyperparameter space
>>> hyperparams = {
>>>     'learning_rate': (0.01, 0.3, True),  # (min, max, log_scale)
>>>     'max_depth': (3, 10, False)
>>> }
>>>
>>> # Initialize framework
>>> diag = GeneralizationOpt(hyperparams, random_state=42)
>>>
>>> # Use GP surrogate with Bayesian uncertainty
>>> gp = GaussianProcessRegressor()
>>> surrogate = GenericSurrogate(gp, conformal=False, bayesian=True)
>>>
>>> # Or use RF with conformal prediction
>>> rf = RandomForestRegressor()
>>> surrogate = GenericSurrogate(rf, conformal=True, bayesian=False)
>>>
>>> # Run diagnostics
>>> results = diag.run_diagnostics(
>>>     X_train, X_test, y_train, y_test,
>>>     model_class=LGBMClassifier,
>>>     surrogate=surrogate,
>>>     n_samples=100
>>> )
class GenericSurrogate(sklearn.base.BaseEstimator, sklearn.base.RegressorMixin):
 17class GenericSurrogate(BaseEstimator, RegressorMixin):
 18    """
 19    Generic wrapper for any surrogate model with conformal prediction support
 20
 21    This class wraps any sklearn-compatible model and adds:
 22    - Conformal prediction intervals (distribution-free uncertainty)
 23    - Optional Bayesian uncertainty (if model supports it)
 24    - Unified prediction interface
 25
 26    Parameters:
 27    -----------
 28    base_model : object
 29        Any sklearn-compatible model with fit() and predict() methods
 30    conformal : bool
 31        Whether to use conformal prediction for uncertainty
 32    bayesian : bool
 33        Whether model provides native uncertainty (e.g., GP, Bayesian NN)
 34    """
 35
 36    def __init__(self, base_model, conformal=True, bayesian=False):
 37        self.base_model = base_model
 38        self.conformal = conformal
 39        self.bayesian = bayesian
 40        self.nonconformity_scores_ = None
 41        self.is_fitted_ = False
 42
 43    def fit(self, X, y, cal_ratio=0.2):
 44        """
 45        Fit surrogate with optional conformal calibration
 46
 47        Parameters:
 48        -----------
 49        X : array-like, shape (n_samples, n_features)
 50            Training features
 51        y : array-like, shape (n_samples,)
 52            Training targets
 53        cal_ratio : float
 54            Proportion of data for conformal calibration (if conformal=True)
 55
 56        Returns:
 57        --------
 58        self : object
 59        """
 60        if self.conformal and cal_ratio > 0:
 61            # Split conformal prediction: reserve data for calibration
 62            n_cal = int(len(X) * cal_ratio)
 63            X_train, X_cal = X[:-n_cal], X[-n_cal:]
 64            y_train, y_cal = y[:-n_cal], y[-n_cal:]
 65
 66            self.base_model.fit(X_train, y_train)
 67
 68            # Compute nonconformity scores on calibration set
 69            y_cal_pred = self.base_model.predict(X_cal)
 70            self.nonconformity_scores_ = np.abs(y_cal - y_cal_pred)
 71        else:
 72            # No conformal prediction: use all data
 73            self.base_model.fit(X, y)
 74
 75        self.is_fitted_ = True
 76        return self
 77
 78    def predict(self, X, return_std=False, confidence=0.95):
 79        """
 80        Unified prediction interface with optional uncertainty
 81
 82        Parameters:
 83        -----------
 84        X : array-like, shape (n_samples, n_features)
 85            Test features
 86        return_std : bool
 87            Whether to return uncertainty estimates
 88        confidence : float
 89            Confidence level for prediction intervals (if conformal=True)
 90
 91        Returns:
 92        --------
 93        predictions : array or tuple
 94            If return_std=False: predictions only
 95            If return_std=True: (predictions, lower_bound, upper_bound)
 96
 97        Notes:
 98        ------
 99        - Conformal: Distribution-free intervals with finite-sample guarantees
100        - Bayesian: Model-based uncertainty (e.g., GP posterior std)
101        """
102        if not self.is_fitted_:
103            raise ValueError("Model must be fitted before prediction")
104
105        # Get point predictions
106        y_pred = self.base_model.predict(X)
107
108        if not return_std:
109            return y_pred
110
111        # Compute uncertainty estimates
112        if self.conformal and self.nonconformity_scores_ is not None:
113            # Conformal prediction intervals
114            alpha = 1 - confidence
115            n_scores = len(self.nonconformity_scores_)
116
117            # Calculate the effective quantile level based on (n+1) rule
118            # Cap the quantile level at 1.0 to prevent ValueError from np.quantile
119            quantile_level = min(
120                1.0, np.ceil((1 - alpha) * (n_scores + 1)) / n_scores
121            )
122
123            q = np.quantile(self.nonconformity_scores_, quantile_level)
124            y_lower = y_pred - q
125            y_upper = y_pred + q
126
127        elif self.bayesian and hasattr(self.base_model, "predict"):
128            # Try to get Bayesian uncertainty from model
129            try:
130                # For GP-like models
131                if (
132                    hasattr(self.base_model, "predict")
133                    and "return_std"
134                    in self.base_model.predict.__code__.co_varnames
135                ):
136                    _, y_std = self.base_model.predict(X, return_std=True)
137                    y_lower = y_pred - 2 * y_std  # 95% CI for Gaussian
138                    y_upper = y_pred + 2 * y_std
139                else:
140                    # Fallback: assume constant uncertainty
141                    y_std = np.std(y_pred) * np.ones_like(y_pred)
142                    y_lower = y_pred - 2 * y_std
143                    y_upper = y_pred + 2 * y_std
144            except:
145                # No uncertainty available
146                y_lower = y_pred
147                y_upper = y_pred
148        else:
149            # No uncertainty quantification available
150            y_lower = y_pred
151            y_upper = y_pred
152
153        return y_pred, y_lower, y_upper

Generic wrapper for any surrogate model with conformal prediction support

This class wraps any sklearn-compatible model and adds:

  • Conformal prediction intervals (distribution-free uncertainty)
  • Optional Bayesian uncertainty (if model supports it)
  • Unified prediction interface

Parameters:

base_model : object Any sklearn-compatible model with fit() and predict() methods conformal : bool Whether to use conformal prediction for uncertainty bayesian : bool Whether model provides native uncertainty (e.g., GP, Bayesian NN)

def fit(self, X, y, cal_ratio=0.2):
43    def fit(self, X, y, cal_ratio=0.2):
44        """
45        Fit surrogate with optional conformal calibration
46
47        Parameters:
48        -----------
49        X : array-like, shape (n_samples, n_features)
50            Training features
51        y : array-like, shape (n_samples,)
52            Training targets
53        cal_ratio : float
54            Proportion of data for conformal calibration (if conformal=True)
55
56        Returns:
57        --------
58        self : object
59        """
60        if self.conformal and cal_ratio > 0:
61            # Split conformal prediction: reserve data for calibration
62            n_cal = int(len(X) * cal_ratio)
63            X_train, X_cal = X[:-n_cal], X[-n_cal:]
64            y_train, y_cal = y[:-n_cal], y[-n_cal:]
65
66            self.base_model.fit(X_train, y_train)
67
68            # Compute nonconformity scores on calibration set
69            y_cal_pred = self.base_model.predict(X_cal)
70            self.nonconformity_scores_ = np.abs(y_cal - y_cal_pred)
71        else:
72            # No conformal prediction: use all data
73            self.base_model.fit(X, y)
74
75        self.is_fitted_ = True
76        return self

Fit surrogate with optional conformal calibration

Parameters:

X : array-like, shape (n_samples, n_features) Training features y : array-like, shape (n_samples,) Training targets cal_ratio : float Proportion of data for conformal calibration (if conformal=True)

Returns:

self : object

def predict(self, X, return_std=False, confidence=0.95):
 78    def predict(self, X, return_std=False, confidence=0.95):
 79        """
 80        Unified prediction interface with optional uncertainty
 81
 82        Parameters:
 83        -----------
 84        X : array-like, shape (n_samples, n_features)
 85            Test features
 86        return_std : bool
 87            Whether to return uncertainty estimates
 88        confidence : float
 89            Confidence level for prediction intervals (if conformal=True)
 90
 91        Returns:
 92        --------
 93        predictions : array or tuple
 94            If return_std=False: predictions only
 95            If return_std=True: (predictions, lower_bound, upper_bound)
 96
 97        Notes:
 98        ------
 99        - Conformal: Distribution-free intervals with finite-sample guarantees
100        - Bayesian: Model-based uncertainty (e.g., GP posterior std)
101        """
102        if not self.is_fitted_:
103            raise ValueError("Model must be fitted before prediction")
104
105        # Get point predictions
106        y_pred = self.base_model.predict(X)
107
108        if not return_std:
109            return y_pred
110
111        # Compute uncertainty estimates
112        if self.conformal and self.nonconformity_scores_ is not None:
113            # Conformal prediction intervals
114            alpha = 1 - confidence
115            n_scores = len(self.nonconformity_scores_)
116
117            # Calculate the effective quantile level based on (n+1) rule
118            # Cap the quantile level at 1.0 to prevent ValueError from np.quantile
119            quantile_level = min(
120                1.0, np.ceil((1 - alpha) * (n_scores + 1)) / n_scores
121            )
122
123            q = np.quantile(self.nonconformity_scores_, quantile_level)
124            y_lower = y_pred - q
125            y_upper = y_pred + q
126
127        elif self.bayesian and hasattr(self.base_model, "predict"):
128            # Try to get Bayesian uncertainty from model
129            try:
130                # For GP-like models
131                if (
132                    hasattr(self.base_model, "predict")
133                    and "return_std"
134                    in self.base_model.predict.__code__.co_varnames
135                ):
136                    _, y_std = self.base_model.predict(X, return_std=True)
137                    y_lower = y_pred - 2 * y_std  # 95% CI for Gaussian
138                    y_upper = y_pred + 2 * y_std
139                else:
140                    # Fallback: assume constant uncertainty
141                    y_std = np.std(y_pred) * np.ones_like(y_pred)
142                    y_lower = y_pred - 2 * y_std
143                    y_upper = y_pred + 2 * y_std
144            except:
145                # No uncertainty available
146                y_lower = y_pred
147                y_upper = y_pred
148        else:
149            # No uncertainty quantification available
150            y_lower = y_pred
151            y_upper = y_pred
152
153        return y_pred, y_lower, y_upper

Unified prediction interface with optional uncertainty

Parameters:

X : array-like, shape (n_samples, n_features) Test features return_std : bool Whether to return uncertainty estimates confidence : float Confidence level for prediction intervals (if conformal=True)

Returns:

predictions : array or tuple If return_std=False: predictions only If return_std=True: (predictions, lower_bound, upper_bound)

Notes:

  • Conformal: Distribution-free intervals with finite-sample guarantees
  • Bayesian: Model-based uncertainty (e.g., GP posterior std)