GPopt
1from .GPOpt import GPOpt 2from .BOstopping import BOstopping 3from .genericcv import MLOptimizer 4from .GeneralizationOpt import GeneralizationOpt 5from .GeneralizationOpt import GenericSurrogate 6 7__all__ = [ 8 "GPOpt", 9 "BOstopping", 10 "MLOptimizer", 11 "GeneralizationOpt", 12 "GenericSurrogate", 13]
36class GPOpt: 37 """Class GPOpt. 38 39 # Arguments: 40 41 lower_bound: a numpy array; 42 lower bound for researched minimum 43 44 upper_bound: a numpy array; 45 upper bound for researched minimum 46 47 objective_func: a function; 48 the objective function to be minimized 49 50 params_names: a list; 51 names of the parameters of the objective function (optional) 52 53 surrogate_obj: an object; 54 An ML model for estimating the uncertainty around the objective function 55 Must be nnetsauce.CustomRegressor or nnetsauce.PredictionInterval 56 57 x_init: 58 initial setting of points where `objective_func` is evaluated (optional) 59 60 y_init: 61 initial setting values at points where `objective_func` is evaluated (optional) 62 63 n_init: an integer; 64 number of points in the initial setting, when `x_init` and `y_init` are not provided 65 66 n_choices: an integer; 67 number of points for the calculation of expected improvement 68 69 n_iter: an integer; 70 number of iterations of the minimization algorithm 71 72 alpha: a float; 73 Value added to the diagonal of the kernel matrix during fitting (for Matern 5/2 kernel) 74 75 n_restarts_optimizer: an integer; 76 The number of restarts of the optimizer for finding the kernel’s parameters which maximize the log-marginal likelihood. 77 78 seed: an integer; 79 reproducibility seed 80 81 save: a string; 82 Specifies where to save the optimizer in its current state 83 84 n_jobs: an integer; 85 number of jobs for parallel computing on initial setting or method `lazyoptimize` (can be -1) 86 87 acquisition: a string; 88 acquisition function: "ei" (expected improvement) or "ucb" (upper confidence bound) 89 90 method: an str; 91 "bayesian" (default) for Gaussian posteriors, "mc" for Monte Carlo posteriors, 92 "splitconformal" (only for acquisition = "ucb") for conformalized surrogates 93 94 min_value: a float; 95 minimum value of the objective function (default is None). For example, 96 if objective function is accuracy, will be 1, and the algorithm will stop 97 98 per_second: a boolean; 99 __experimental__, default is False (leave to default for now) 100 101 log_scale: a boolean; 102 __experimental__, default is False (leave to default for now) 103 104 see also [Bayesian Optimization with GPopt](https://thierrymoudiki.github.io/blog/2021/04/16/python/misc/gpopt) 105 or [Hyperparameters tuning with GPopt](https://thierrymoudiki.github.io/blog/2021/06/11/python/misc/hyperparam-tuning-gpopt) [Agnostic BayesOpt](https://thierrymoudiki.github.io/blog/2024/12/09/python/bayesconfoptim) 106 107 """ 108 109 def __init__( 110 self, 111 lower_bound, 112 upper_bound, 113 objective_func=None, 114 params_names=None, 115 surrogate_obj=None, 116 x_init=None, 117 y_init=None, 118 n_init=10, 119 n_choices=100000, 120 n_iter=190, 121 alpha=1e-6, 122 n_restarts_optimizer=25, 123 seed=123, 124 save=None, 125 n_jobs=1, 126 acquisition="ei", 127 method="bayesian", 128 min_value=None, 129 per_second=False, # /!\ very experimental 130 log_scale=False, # /!\ experimental 131 ): 132 n_dims = len(lower_bound) 133 134 assert n_dims == len( 135 upper_bound 136 ), "'upper_bound' and 'lower_bound' must have the same dimensions" 137 138 self.objective_func = objective_func 139 self.params_names = params_names 140 self.lower_bound = lower_bound 141 self.upper_bound = upper_bound 142 self.y_init = y_init 143 self.log_scale = log_scale 144 self.n_dims = n_dims 145 self.n_init = n_init 146 self.n_choices = n_choices 147 self.n_iter = n_iter 148 self.alpha = alpha 149 self.n_restarts_optimizer = n_restarts_optimizer 150 self.seed = seed 151 self.save = save 152 self.n_jobs = n_jobs # for parallel case on initial design 153 self.per_second = per_second 154 self.x_min = None 155 self.y_min = None 156 self.y_mean = None 157 self.y_std = None 158 self.y_lower = None 159 self.y_upper = None 160 self.best_surrogate = None 161 self.acquisition = acquisition 162 self.min_value = min_value 163 self.acq = np.array([]) 164 self.max_acq = [] 165 if surrogate_obj is None: 166 self.surrogate_obj = GaussianProcessRegressor( 167 kernel=Matern(nu=2.5), 168 alpha=self.alpha, 169 normalize_y=True, 170 n_restarts_optimizer=self.n_restarts_optimizer, 171 random_state=self.seed, 172 ) 173 else: 174 self.surrogate_obj = surrogate_obj 175 assert method in ( 176 "bayesian", 177 "mc", 178 "splitconformal", 179 ), "method must be in ('bayesian', 'mc', 'splitconformal')" 180 self.method = method 181 self.posterior_ = None 182 183 # Sobol seqs for initial design and choices 184 sobol_seq_init = np.transpose( 185 generate_sobol2( 186 n_dims=self.n_dims, 187 n_points=self.n_init, 188 skip=2, 189 ) 190 ) 191 sobol_seq_choices = np.transpose( 192 generate_sobol2( 193 n_dims=self.n_dims, 194 n_points=self.n_choices, 195 skip=self.n_init + 2, 196 ) 197 ) 198 199 # Sobol seqs for initial design and choices with bounds 200 if self.log_scale == False: 201 bounds_range = upper_bound - lower_bound 202 self.x_init = ( 203 bounds_range * sobol_seq_init + lower_bound 204 if x_init is None 205 else x_init 206 ) 207 self.x_choices = bounds_range * sobol_seq_choices + lower_bound 208 209 else: # (!) experimental 210 assert ( 211 lower_bound > 0 212 ).all(), "all elements of `lower_bound` must be > 0" 213 assert ( 214 upper_bound > 0 215 ).all(), "all elements of `upper_bound` must be > 0" 216 217 log_lower_bound = np.log(lower_bound) 218 log_upper_bound = np.log(upper_bound) 219 log_bounds_range = log_upper_bound - log_lower_bound 220 self.x_init = ( 221 np.minimum( 222 np.exp(log_bounds_range * sobol_seq_init + log_lower_bound), 223 1.7976931348623157e308, 224 ) 225 if x_init is None 226 else x_init 227 ) 228 self.x_choices = np.minimum( 229 np.exp(log_bounds_range * sobol_seq_choices + log_lower_bound), 230 1.7976931348623157e308, 231 ) 232 233 # shelve for saving (not for loading) 234 if self.save is not None: 235 self.sh = shelve.open(filename=save, flag="c", writeback=True) 236 237 if self.per_second: 238 self.timings = [] 239 self.rf_obj = RandomForestRegressor( 240 n_estimators=250, random_state=self.seed 241 ) 242 243 self.n_jobs = n_jobs 244 245 # from sklearn.base 246 def get_params(self): 247 """Get object attributes. 248 249 Returns 250 ------- 251 params : mapping of string to any 252 Parameter names mapped to their values. 253 """ 254 out = dict() 255 param_names = dir(self) 256 for key in param_names: 257 if key.startswith("_") is False: 258 out[key] = getattr(self, key, None) 259 260 return out 261 262 # for parallel case on initial design 263 def eval_objective(self, arg): 264 try: 265 return self.objective_func(self.x_init[arg, :]) 266 except: 267 return 1e06 268 269 # load data from stored shelve 270 def load(self, path): 271 """load data from stored shelve. 272 273 # Arguments 274 275 path : a string; path to stored shelve. 276 277 See also: [Bayesian Optimization with GPopt Part 2 (save and resume)](https://thierrymoudiki.github.io/blog/2021/04/30/python/misc/gpopt) 278 """ 279 280 self.sh = shelve.open(filename=path) 281 for key, value in self.sh.items(): 282 setattr(self, key, value) 283 284 # update shelve in optimization loop 285 def update_shelve(self): 286 for key, value in self.get_params().items(): 287 if (callable(value) is False) & (key != "sh"): 288 self.sh[key] = value 289 self.sh.sync() 290 291 # closing shelve (can't be modified after) 292 def close_shelve(self): 293 """Close shelve. 294 295 # Arguments 296 297 No argument. 298 299 See also: [Bayesian Optimization with GPopt Part 2 (save and resume)](https://thierrymoudiki.github.io/blog/2021/04/30/python/misc/gpopt) 300 """ 301 302 self.sh.close() 303 304 # fit predict 305 def surrogate_fit_predict( 306 self, 307 X_train, 308 y_train, 309 X_test, 310 return_std=False, 311 return_pi=False, 312 param_search_init_design=False, 313 param_distributions=None, 314 **kwargs, 315 ): 316 if len(X_train.shape) == 1: 317 X_train = X_train.reshape((-1, 1)) 318 X_test = X_test.reshape((-1, 1)) 319 320 if ( 321 X_train.shape[0] <= self.n_init and param_search_init_design == True 322 ): # on initial design 323 try: 324 rs_obj = RandomizedSearchCV( 325 self.surrogate_obj, 326 param_distributions=param_distributions, 327 random_state=42, 328 cv=3, 329 **kwargs, 330 ) 331 rs_obj.fit(X_train, y_train) 332 self.surrogate_obj = rs_obj.best_estimator_ 333 except Exception as e: 334 print(str(e)) 335 336 # Get mean and standard deviation (+ lower and upper for not GPs) 337 assert ( 338 return_std == True and return_pi == True 339 ) == False, "must have either return_std == True or return_pi == True" 340 341 if return_std == True: 342 self.posterior_ = "gaussian" 343 self.surrogate_obj.fit(X_train, y_train) 344 return self.surrogate_obj.predict(X_test, return_std=True) 345 346 elif ( 347 return_pi == True 348 ): # here, self.surrogate_obj must have `replications` not None 349 if self.surrogate_obj.replications is not None: 350 self.posterior_ = "mc" 351 self.surrogate_obj.fit(X_train, y_train) 352 try: # it's a nnetsauce.CustomRegressor 353 res = self.surrogate_obj.predict( 354 X_test, return_pi=True, method="splitconformal" 355 ) 356 except Exception: # it's a nnetsauce.PredictionInterval 357 try: 358 res = self.surrogate_obj.predict(X_test, return_pi=True) 359 except Exception as e: 360 print( 361 str(e) 362 + "Sureproof way is to encapsulate your surrogate in nnetsauce.PredictionInterval model" 363 ) 364 365 self.y_sims = res.sims 366 self.y_mean, self.y_std = ( 367 np.mean(self.y_sims, axis=1), 368 np.std(self.y_sims, axis=1), 369 ) 370 return self.y_mean, self.y_std, self.y_sims 371 372 else: # self.surrogate_obj is conformalized (uses nnetsauce.PredictionInterval) 373 assert ( 374 self.acquisition == "ucb" 375 ), "'acquisition' must be 'ucb' for conformalized surrogates" 376 self.posterior_ = None 377 self.surrogate_obj.fit(X_train, y_train) 378 try: 379 res = self.surrogate_obj.predict( 380 X_test, return_pi=True, method="splitconformal" 381 ) 382 except Exception: 383 res = self.surrogate_obj.predict(X_test, return_pi=True) 384 self.y_mean = res.mean 385 self.y_lower = res.lower 386 self.y_upper = res.upper 387 return self.y_mean, self.y_lower, self.y_upper 388 389 else: 390 raise NotImplementedError 391 392 # fit predict timings 393 def timings_fit_predict(self, X_train, y_train, X_test): 394 if len(X_train.shape) == 1: 395 X_train = X_train.reshape((-1, 1)) 396 X_test = X_test.reshape((-1, 1)) 397 398 # Get mean preds for timings 399 return self.rf_obj.fit(X_train, y_train).predict(X_test) 400 401 # find next parameter by using expected improvement (ei) 402 def next_parameter_by_acq(self, i, acq="ei"): 403 if acq == "ei": 404 if self.posterior_ == "gaussian": 405 gamma_hat = (self.y_min - self.y_mean) / self.y_std 406 self.acq = -self.y_std * ( 407 gamma_hat * st.norm.cdf(gamma_hat) + st.norm.pdf(gamma_hat) 408 ) 409 elif self.posterior_ == "mc": 410 self.acq = -np.mean( 411 np.maximum(self.y_min - self.y_sims, 0), axis=1 412 ) 413 414 if acq == "ucb": 415 if self.posterior_ == "gaussian": 416 self.acq = self.y_mean - 1.96 * self.y_std 417 self.ucb = self.y_mean + 1.96 * self.y_std 418 419 elif self.posterior_ is None: # split conformal(ized) estimator 420 self.acq = self.y_lower 421 self.ucb = self.y_upper 422 423 # find max index ----- 424 425 if self.per_second is False: 426 # find index for max. ei 427 max_index = self.acq.argmin() 428 429 else: # self.per_second is True 430 # predict timings on self.x_choices 431 # train on X = self.parameters and y = self.timings 432 # (must have same shape[0]) 433 timing_preds = self.timings_fit_predict( 434 X_train=np.asarray(self.parameters), 435 y_train=np.asarray(self.timings), 436 X_test=self.x_choices, 437 ) 438 439 # find index for max. ei (and min. timings) 440 max_index = (-self.acq / timing_preds).argmax() 441 442 self.max_acq.append(np.abs(self.acq[max_index])) 443 444 # Select next choice 445 next_param = self.x_choices[max_index, :] 446 447 if next_param in np.asarray(self.parameters): 448 if self.log_scale == False: 449 np.random.seed(self.seed * i + 1000) 450 next_param = ( 451 self.upper_bound - self.lower_bound 452 ) * np.random.rand(self.n_dims) + self.lower_bound 453 454 else: # /!\ very... experimental 455 np.random.seed(self.seed) 456 log_upper_bound = np.log(self.upper_bound) 457 log_lower_bound = np.log(self.lower_bound) 458 log_bounds_range = log_upper_bound - log_lower_bound 459 460 next_param = np.minimum( 461 np.exp( 462 log_bounds_range * np.random.rand(self.n_dims) 463 + log_lower_bound 464 ), 465 1.7976931348623157e308, 466 ) 467 468 return next_param 469 470 # optimize the objective 471 def optimize( 472 self, 473 verbose=1, 474 n_more_iter=None, 475 abs_tol=None, # suggested 1e-4, for n_iter = 200 476 ucb_tol=None, 477 min_budget=50, # minimum budget for early stopping 478 func_args=None, 479 param_search_init_design=False, 480 param_distributions=None, 481 ): 482 """Launch optimization loop. 483 484 # Arguments: 485 486 verbose: an integer; 487 verbose = 0: nothing is printed, 488 verbose = 1: a progress bar is printed (longer than 0), 489 verbose = 2: information about each iteration is printed (longer than 1) 490 491 n_more_iter: an integer; 492 additional number of iterations for the optimizer (which has been run once) 493 494 abs_tol: a float; 495 tolerance for convergence of the optimizer (early stopping based on acquisition function) 496 497 ucb_tol: a float; 498 tolerance for convergence of the optimizer (early stopping based on length of prediction intervals) 499 for UCB criterion 500 501 min_budget: an integer (default is 50); 502 minimum number of iterations before early stopping controlled by `abs_tol` 503 504 func_args: a list; 505 additional parameters for the objective function (if necessary) 506 507 param_search_init_design: a boolean; 508 whether random search tuning must occur on the initial design or not 509 510 param_distributions: dict or list of dicts; 511 Dictionary with parameters names (str) as keys and distributions or lists of 512 parameters to try. Distributions must provide a rvs method for sampling 513 (such as those from scipy.stats.distributions). If a list is given, it 514 is sampled uniformly. If a list of dicts is given, first a dict is sampled 515 uniformly, and then a parameter is sampled using that dict as above. 516 517 see also [Bayesian Optimization with GPopt](https://thierrymoudiki.github.io/blog/2021/04/16/python/misc/gpopt) 518 and [Hyperparameters tuning with GPopt](https://thierrymoudiki.github.io/blog/2021/06/11/python/misc/hyperparam-tuning-gpopt) 519 520 """ 521 522 # verbose = 0: nothing is printed 523 # verbose = 1: a progress bar is printed (longer than 0) 524 # verbose = 2: information about each iteration is printed (longer than 1) 525 if func_args is None: 526 func_args = [] 527 528 if ( 529 n_more_iter is None 530 ): # initial optimization, before more iters are requested 531 n_iter = self.n_iter 532 # stopping iter for early stopping (default is total budget) 533 iter_stop = n_iter # potentially # got to check this 534 535 # initial design ---------- 536 537 if (verbose == 1) | (verbose == 2): 538 print(f"\n Creating initial design... \n") 539 540 if verbose == 1: 541 progbar = Progbar(target=self.n_init) 542 543 self.parameters = self.x_init.tolist() 544 self.scores = [] 545 546 if self.save is not None: 547 self.update_shelve() 548 549 if self.y_init is None: # calculate scores on initial design 550 assert ( 551 self.objective_func is not None 552 ), "self.y_init is None: must have 'objective_func' not None" 553 554 if self.n_jobs == 1: 555 for i in range(self.n_init): 556 x_next = self.x_init[i, :] 557 558 try: 559 if self.per_second is True: 560 start = time() 561 score = self.objective_func(x_next, *func_args) 562 if (np.isfinite(score) == False) or ( 563 np.isnan(score) == True 564 ): 565 continue 566 self.timings.append(np.log(time() - start)) 567 568 else: # self.per_second is False 569 score = self.objective_func(x_next, *func_args) 570 if (np.isfinite(score) == False) or ( 571 np.isnan(score) == True 572 ): 573 continue 574 575 self.scores.append(score) 576 577 if self.save is not None: 578 self.update_shelve() 579 580 except: 581 continue 582 583 if verbose == 1: 584 progbar.update(i) # update progress bar 585 586 if verbose == 2: 587 print(f"point: {x_next}; score: {score}") 588 # end loop # calculate scores on initial design 589 590 if verbose == 1: 591 progbar.update(self.n_init) 592 593 else: # self.n_jobs != 1 594 assert ( 595 self.per_second is False 596 ), "timings not calculated here" 597 598 scores = Parallel(n_jobs=self.n_jobs, prefer="threads")( 599 delayed(self.objective_func)(self.x_init[i, :]) 600 for i in range(self.n_init) 601 ) 602 603 self.scores = scores 604 605 if self.save is not None: 606 self.update_shelve() 607 608 else: # if self.y_init is not None: 609 assert self.x_init.shape[0] == len( 610 self.y_init 611 ), "must have: self.x_init.shape[0] == len(self.y_init)" 612 613 self.scores = pickle.loads( 614 pickle.dumps(self.y_init.tolist(), -1) 615 ) 616 617 # current best score on initial design 618 min_index = (np.asarray(self.scores)).argmin() 619 self.y_min = self.scores[min_index] 620 self.x_min = self.x_init[min_index, :] 621 622 # current gp mean and std on initial design 623 # /!\ if GP 624 if param_search_init_design == False: 625 if self.method == "bayesian": 626 self.posterior_ = "gaussian" 627 try: 628 y_mean, y_std = self.surrogate_fit_predict( 629 np.asarray(self.parameters), 630 np.asarray(self.scores), 631 self.x_choices, 632 return_std=True, 633 return_pi=False, 634 ) 635 except ValueError: # do not remove this 636 preds_with_std = self.surrogate_fit_predict( 637 np.asarray(self.parameters), 638 np.asarray(self.scores), 639 self.x_choices, 640 return_std=True, 641 return_pi=False, 642 ) 643 y_mean, y_std = preds_with_std[0], preds_with_std[1] 644 self.y_mean = y_mean 645 self.y_std = np.maximum(2.220446049250313e-16, y_std) 646 647 elif self.method == "mc": 648 self.posterior_ = "mc" 649 assert self.surrogate_obj.__class__.__name__.startswith( 650 "CustomRegressor" 651 ) or self.surrogate_obj.__class__.__name__.startswith( 652 "PredictionInterval" 653 ), "for `method = 'mc'`, the surrogate must be a nnetsauce.CustomRegressor() or nnetsauce.PredictionInterval()" 654 assert ( 655 self.surrogate_obj.replications is not None 656 ), "for `method = 'mc'`, the surrogate must be a nnetsauce.CustomRegressor() with a number of 'replications' provided" 657 preds_with_std = self.surrogate_fit_predict( 658 np.asarray(self.parameters), 659 np.asarray(self.scores), 660 self.x_choices, 661 return_std=False, 662 return_pi=True, 663 ) 664 y_mean, y_std = preds_with_std[0], preds_with_std[1] 665 self.y_mean = y_mean 666 self.y_std = np.maximum(2.220446049250313e-16, y_std) 667 668 elif self.method == "splitconformal": 669 self.posterior_ = None 670 # assert self.surrogate_obj.__class__.__name__.startswith( 671 # "PredictionInterval" 672 # ), "for `method = 'splitconformal'`, the surrogate must be a nnetsauce.PredictionInterval()" 673 preds_with_pi = self.surrogate_fit_predict( 674 np.asarray(self.parameters), 675 np.asarray(self.scores), 676 self.x_choices, 677 return_std=False, 678 return_pi=True, 679 ) 680 y_lower = preds_with_pi[1] 681 self.lower = y_lower 682 683 else: 684 assert ( 685 param_distributions is not None 686 ), "When 'param_search_init_design == False', 'param_distributions' must be provided" 687 688 if self.method == "bayesian": 689 self.posterior_ = "gaussian" 690 try: 691 y_mean, y_std = self.surrogate_fit_predict( 692 np.asarray(self.parameters), 693 np.asarray(self.scores), 694 self.x_choices, 695 return_std=True, 696 return_pi=False, 697 param_search_init_design=True, 698 param_distributions=param_distributions, 699 ) 700 except ValueError: # do not remove this 701 preds_with_std = self.surrogate_fit_predict( 702 np.asarray(self.parameters), 703 np.asarray(self.scores), 704 self.x_choices, 705 return_std=True, 706 return_pi=False, 707 param_search_init_design=True, 708 param_distributions=param_distributions, 709 ) 710 y_mean, y_std = preds_with_std[0], preds_with_std[1] 711 self.y_mean = y_mean 712 self.y_std = np.maximum(2.220446049250313e-16, y_std) 713 714 elif self.method == "mc": 715 self.posterior_ = "mc" 716 assert self.surrogate_obj.__class__.__name__.startswith( 717 "CustomRegressor" 718 ) or self.surrogate_obj.__class__.__name__.startswith( 719 "PredictionInterval" 720 ), "for `method = 'mc'`, the surrogate must be a nnetsauce.CustomRegressor() or nnetsauce.PredictionInterval()" 721 assert ( 722 self.surrogate_obj.replications is not None 723 ), "for `method = 'mc'`, the surrogate must be a nnetsauce.CustomRegressor() with a number of 'replications' provided" 724 preds_with_std = self.surrogate_fit_predict( 725 np.asarray(self.parameters), 726 np.asarray(self.scores), 727 self.x_choices, 728 return_std=False, 729 return_pi=True, 730 param_search_init_design=True, 731 param_distributions=param_distributions, 732 ) 733 y_mean, y_std = preds_with_std[0], preds_with_std[1] 734 self.y_mean = y_mean 735 self.y_std = np.maximum(2.220446049250313e-16, y_std) 736 737 elif self.method == "splitconformal": 738 self.posterior_ = None 739 # assert self.surrogate_obj.__class__.__name__.startswith( 740 # "PredictionInterval" 741 # ), "for `method = 'splitconformal'`, the surrogate must be a nnetsauce.PredictionInterval()" 742 preds_with_pi = self.surrogate_fit_predict( 743 np.asarray(self.parameters), 744 np.asarray(self.scores), 745 self.x_choices, 746 return_std=False, 747 return_pi=True, 748 param_search_init_design=True, 749 param_distributions=param_distributions, 750 ) 751 y_lower = preds_with_pi[1] 752 self.lower = y_lower 753 754 # saving after initial design computation 755 if self.save is not None: 756 self.update_shelve() 757 758 else: # if n_more_iter is not None 759 assert self.n_iter > 5, "you must have n_iter > 5" 760 n_iter = n_more_iter 761 iter_stop = len(self.max_acq) + n_more_iter # potentially 762 763 if (verbose == 1) | (verbose == 2): 764 print(f"\n ...Done. \n") 765 try: 766 print(np.hstack((self.x_init, self.y_init.reshape(-1, 1)))) 767 except: 768 pass 769 770 # end init design ---------- 771 772 # if n_more_iter is None: # initial optimization, before more iters are requested 773 774 if (verbose == 1) | (verbose == 2): 775 print(f"\n Optimization loop... \n") 776 777 # early stopping? 778 if abs_tol is not None: 779 assert ( 780 min_budget > 20 781 ), "With 'abs_tol' provided, you must have 'min_budget' > 20" 782 self.abs_tol = abs_tol 783 784 if ucb_tol is not None: 785 assert ( 786 min_budget > 20 787 ), "With 'ucb_tol' provided, you must have 'min_budget' > 20" 788 assert ( 789 self.acquisition == "ucb" 790 ), "With 'ucb_tol' provided, you must have 'acquisition' == 'ucb'" 791 self.ucb_tol = ucb_tol 792 793 if verbose == 1: 794 progbar = Progbar(target=n_iter) 795 796 # main loop ---------- 797 798 for i in range(n_iter): 799 # find next set of parameters (vector), maximizing acquisition function 800 next_param = self.next_parameter_by_acq(i=i, acq=self.acquisition) 801 802 try: 803 if self.per_second is True: 804 start = time() 805 806 if self.objective_func is not None: 807 score_next_param = self.objective_func( 808 next_param, *func_args 809 ) 810 811 if (np.isfinite(score_next_param) == False) or ( 812 np.isnan(score_next_param) == True 813 ): 814 continue 815 816 else: 817 assert (self.x_init is not None) and ( 818 self.y_init is not None 819 ), "self.objective_func is not None: must have (self.x_init is not None) and (self.y_init is not None)" 820 821 print(f"\n next param: {next_param} \n") 822 score_next_param = float( 823 input("get new score: \n") 824 ) # or an API response 825 826 if (np.isfinite(score_next_param) == False) or ( 827 np.isnan(score_next_param) == True 828 ): 829 continue 830 831 self.timings.append(np.log(time() - start)) 832 833 else: # self.per_second is False: 834 if self.objective_func is not None: 835 score_next_param = self.objective_func( 836 next_param, *func_args 837 ) 838 839 if (np.isfinite(score_next_param) == False) or ( 840 np.isnan(score_next_param) == True 841 ): 842 continue 843 844 else: 845 assert (self.x_init is not None) and ( 846 self.y_init is not None 847 ), "self.objective_func is not None: must have (self.x_init is not None) and (self.y_init is not None)" 848 849 print(f"\n next param: {next_param} \n") 850 score_next_param = float( 851 input("get new score: \n") 852 ) # or an API response 853 854 if (np.isfinite(score_next_param) == False) or ( 855 np.isnan(score_next_param) == True 856 ): 857 continue 858 859 except: 860 continue 861 862 self.parameters.append(next_param.tolist()) 863 864 self.scores.append(score_next_param) 865 866 if self.save is not None: 867 self.update_shelve() 868 869 if verbose == 2: 870 print(f"iteration {i + 1} -----") 871 print(f"current minimum: {self.x_min}") 872 print(f"current minimum score: {self.y_min}") 873 print(f"next parameter: {next_param}") 874 print(f"score for next parameter: {score_next_param} \n") 875 876 if score_next_param < self.y_min: 877 self.x_min = next_param 878 self.y_min = score_next_param 879 if self.save is not None: 880 self.update_shelve() 881 if self.y_min == self.min_value: 882 break 883 884 if self.posterior_ == "gaussian" and self.method == "bayesian": 885 try: 886 self.y_mean, self.y_std = self.surrogate_fit_predict( 887 np.asarray(self.parameters), 888 np.asarray(self.scores), 889 self.x_choices, 890 return_std=True, 891 return_pi=False, 892 ) 893 except: 894 ( 895 self.y_mean, 896 self.y_std, 897 lower, 898 upper, 899 ) = self.surrogate_fit_predict( 900 np.asarray(self.parameters), 901 np.asarray(self.scores), 902 self.x_choices, 903 return_std=True, 904 return_pi=False, 905 ) 906 907 elif self.posterior_ in (None, "mc") and self.method in ( 908 "mc", 909 "splitconformal", 910 ): 911 ( 912 self.y_mean, 913 self.y_lower, 914 self.y_upper, 915 ) = self.surrogate_fit_predict( 916 np.asarray(self.parameters), 917 np.asarray(self.scores), 918 self.x_choices, 919 return_std=False, 920 return_pi=True, 921 ) 922 923 else: 924 return NotImplementedError 925 926 if self.save is not None: 927 self.update_shelve() 928 929 if verbose == 1: 930 progbar.update(i + 1) # update progress bar 931 932 # early stopping 933 934 if abs_tol is not None: 935 # if self.max_acq.size > (self.n_init + self.n_iter * min_budget_pct): 936 if len(self.max_acq) > min_budget: 937 diff_max_acq = np.abs(np.diff(np.asarray(self.max_acq))) 938 939 if diff_max_acq[-1] <= abs_tol: 940 iter_stop = len(self.max_acq) # index i starts at 0 941 942 break 943 944 if ucb_tol is not None: 945 if len(self.max_acq) > min_budget: 946 # print(f"self.ucb: {self.ucb}") 947 # print(f"self.acq: {self.acq}") 948 # print(f"mean(self.ucb/self.acq): {np.mean(self.ucb/self.acq)/100}") 949 950 if ( 951 np.abs(np.mean(self.ucb / self.acq) / 100) <= ucb_tol 952 ): # self.ucb is the upper confidence bound for UCB criterion 953 iter_stop = len(self.max_acq) 954 955 break 956 957 # end main loop ---------- 958 959 if (verbose == 1) & (i < (n_iter - 1)): 960 progbar.update(n_iter) 961 962 self.n_iter = iter_stop 963 if self.save is not None: 964 self.update_shelve() 965 966 DescribeResult = namedtuple( 967 "DescribeResult", ("best_params", "best_score") 968 ) 969 970 if self.params_names is None: 971 return DescribeResult(self.x_min, self.y_min) 972 973 else: 974 return DescribeResult( 975 dict(zip(self.params_names, self.x_min)), self.y_min 976 ) 977 978 # optimize the objective 979 def lazyoptimize( 980 self, 981 verbose=1, 982 abs_tol=None, # suggested 1e-4, for n_iter = 200 983 ucb_tol=None, 984 min_budget=50, # minimum budget for early stopping 985 func_args=None, 986 estimators="all", 987 type_pi="kde", # for now, 'kde', 'bootstrap', 'splitconformal' 988 type_exec="independent", # "queue" or "independent" (default) 989 ): 990 """Launch optimization loop. 991 992 # Arguments: 993 994 verbose: an integer; 995 verbose = 0: nothing is printed, 996 verbose = 1: a progress bar is printed (longer than 0), 997 verbose = 2: information about each iteration is printed (longer than 1) 998 999 n_more_iter: an integer; 1000 additional number of iterations for the optimizer (which has been run once) 1001 1002 abs_tol: a float; 1003 tolerance for convergence of the optimizer (early stopping based on expected improvement) 1004 1005 ucb_tol: a float; 1006 tolerance for convergence of the optimizer (early stopping based on length of prediction intervals) 1007 1008 min_budget: an integer (default is 50); 1009 minimum number of iterations before early stopping controlled by `abs_tol` 1010 1011 func_args: a list; 1012 additional parameters for the objective function (if necessary) 1013 1014 estimators: an str or a list of strs (estimators names) 1015 if "all", then 30 models are fitted. Otherwise, only those provided in the list 1016 are adjusted; for example ["RandomForestRegressor", "Ridge"] 1017 1018 type_pi: an str; 1019 "kde" (default) or, "splitconformal"; type of prediction intervals for the surrogate 1020 model 1021 1022 type_exec: an str; 1023 "independent" (default) is when surrogate models are adjusted independently on 1024 the same design set and the best model is chosen eventually; "queue" is when 1025 surrogate models are adjusted one after the other, on a design set with 1026 increasing size; 1027 1028 """ 1029 1030 # Base case: Gaussian Process 1031 gp_opt_obj = GPOpt( 1032 objective_func=self.objective_func, 1033 lower_bound=self.lower_bound, 1034 upper_bound=self.upper_bound, 1035 n_init=self.n_init, 1036 n_iter=self.n_iter, 1037 alpha=self.alpha, 1038 n_restarts_optimizer=self.n_restarts_optimizer, 1039 seed=self.seed, 1040 n_jobs=self.n_jobs, 1041 acquisition="ei", 1042 surrogate_obj=GaussianProcessRegressor( 1043 kernel=Matern(nu=2.5), 1044 alpha=self.alpha, 1045 normalize_y=True, 1046 n_restarts_optimizer=self.n_restarts_optimizer, 1047 random_state=self.seed, 1048 ), 1049 ) 1050 if verbose == 2: 1051 print( 1052 f"\n adjusting surrogate model # {0} (GaussianProcessRegressor(Matern(5/2)))... \n" 1053 ) 1054 res_base = gp_opt_obj.optimize( 1055 verbose=verbose, 1056 abs_tol=abs_tol, # suggested 1e-4, for n_iter = 200 1057 min_budget=min_budget, # minimum budget for early stopping 1058 func_args=func_args, 1059 ) 1060 1061 if estimators == "all": 1062 if type_pi == "kde": 1063 self.regressors = REGRESSORS 1064 1065 else: 1066 self.regressors = [ 1067 ( 1068 est[0], 1069 ns.PredictionInterval( 1070 est[1](), type_pi="splitconformal" 1071 ), 1072 ) 1073 for est in all_estimators() 1074 if ( 1075 issubclass(est[1], RegressorMixin) 1076 and (est[0] not in REMOVED_REGRESSORS) 1077 ) 1078 ] 1079 1080 else: 1081 if type_pi == "kde": 1082 self.regressors = [ 1083 ( 1084 "CustomRegressor(" + est[0] + ")", 1085 ns.CustomRegressor( 1086 est[1](), replications=150, type_pi=type_pi 1087 ), 1088 ) 1089 for est in all_estimators() 1090 if ( 1091 issubclass(est[1], RegressorMixin) 1092 and (est[0] not in REMOVED_REGRESSORS) 1093 and (est[0] in estimators) 1094 ) 1095 ] 1096 1097 elif type_pi == "splitconformal": 1098 self.regressors = [ 1099 ( 1100 est[0], 1101 ns.PredictionInterval( 1102 est[1](), type_pi="splitconformal" 1103 ), 1104 ) 1105 for est in all_estimators() 1106 if ( 1107 issubclass(est[1], RegressorMixin) 1108 and (est[0] not in REMOVED_REGRESSORS) 1109 and (est[0] in estimators) 1110 ) 1111 ] 1112 1113 df_res = pd.DataFrame( 1114 np.empty((len(self.regressors) + 1, 2)), columns=["Model", "Score"] 1115 ) 1116 df_res.iloc[0, 0] = "GaussianProcessRegressor" 1117 df_res.iloc[0, 1] = res_base.best_score 1118 1119 self.surrogate_fit_predict = partial( 1120 self.surrogate_fit_predict, return_pi=True 1121 ) 1122 1123 if ( 1124 type_exec == "queue" 1125 ): # when models are adjusted one after the other on a design set with increasing size 1126 self.x_min = None 1127 1128 self.y_min = np.inf 1129 1130 score_next_param = np.inf 1131 1132 DescribeResult = namedtuple( 1133 "DescribeResult", ("best_params", "best_score", "scores") 1134 ) 1135 1136 if verbose == 2: 1137 print( 1138 f"\n adjusting surrogate model # {1} ({self.regressors[0][0]})... \n" 1139 ) 1140 1141 df_res.iloc[0, 0] = self.regressors[0][0] 1142 1143 gp_opt_obj_prev = GPOpt( 1144 objective_func=self.objective_func, 1145 lower_bound=self.lower_bound, 1146 upper_bound=self.upper_bound, 1147 n_init=self.n_init, 1148 n_iter=self.n_iter, 1149 alpha=self.alpha, 1150 n_restarts_optimizer=self.n_restarts_optimizer, 1151 seed=self.seed, 1152 n_jobs=self.n_jobs, 1153 acquisition=self.acquisition, 1154 method=self.method, 1155 min_value=self.min_value, 1156 surrogate_obj=copy.deepcopy(self.regressors[0][1]), 1157 ) 1158 1159 gp_opt_obj_prev.optimize( 1160 verbose=verbose, 1161 abs_tol=abs_tol, # suggested 1e-4, for n_iter = 200 1162 ucb_tol=ucb_tol, 1163 min_budget=min_budget, # minimum budget for early stopping 1164 func_args=func_args, 1165 ) 1166 1167 score_next_param = gp_opt_obj_prev.y_min 1168 1169 df_res.iloc[0, 1] = score_next_param 1170 1171 if self.n_jobs is None: # sequential optimization 1172 for i in range(len(self.regressors)): 1173 try: 1174 if verbose == 2: 1175 print( 1176 f"\n adjusting surrogate model # {i + 2} ({self.regressors[i][0]})... \n" 1177 ) 1178 1179 df_res["Model"][i + 1] = self.regressors[i][0] 1180 1181 gp_opt_obj = GPOpt( 1182 objective_func=self.objective_func, 1183 lower_bound=self.lower_bound, 1184 upper_bound=self.upper_bound, 1185 n_init=self.n_init, 1186 n_iter=self.n_iter, 1187 alpha=self.alpha, 1188 n_restarts_optimizer=self.n_restarts_optimizer, 1189 seed=self.seed, 1190 n_jobs=self.n_jobs, 1191 acquisition=self.acquisition, 1192 method=self.method, 1193 min_value=self.min_value, 1194 surrogate_obj=copy.deepcopy(self.regressors[i][1]), 1195 x_init=np.asarray(gp_opt_obj_prev.parameters), 1196 y_init=np.asarray(gp_opt_obj_prev.scores), 1197 ) 1198 1199 gp_opt_obj.optimize( 1200 verbose=verbose, 1201 abs_tol=abs_tol, # suggested 1e-4, for n_iter = 200 1202 ucb_tol=ucb_tol, 1203 min_budget=min_budget, # minimum budget for early stopping 1204 func_args=func_args, 1205 ) 1206 1207 score_next_param = gp_opt_obj.y_min 1208 1209 df_res.iloc[i, 1] = score_next_param 1210 1211 if score_next_param < self.y_min: 1212 self.x_min = gp_opt_obj.x_min 1213 self.y_min = score_next_param 1214 if self.y_min == self.min_value: 1215 break 1216 1217 if verbose == 2: 1218 print(f"Global iteration #{i + 1} -----") 1219 print(f"current minimum: {self.x_min}") 1220 print(f"current minimum score: {self.y_min}") 1221 print( 1222 f"score for next parameter: {score_next_param} \n" 1223 ) 1224 1225 gp_opt_obj_prev = copy.deepcopy(gp_opt_obj) 1226 1227 except ValueError: 1228 continue 1229 1230 elif self.n_jobs >= 2 or self.n_jobs == -1: # parallel optimization 1231 pass 1232 else: 1233 raise ValueError( 1234 "n_jobs must be either None or >= 2 or equal to -1" 1235 ) 1236 return DescribeResult( 1237 self.x_min, self.y_min, df_res.sort_values(by="Score") 1238 ) 1239 1240 elif ( 1241 type_exec == "independent" 1242 ): # when models are adjusted independently on the same design set and the best model is chosen eventually 1243 self.x_min = None 1244 1245 self.y_min = np.inf 1246 1247 score_next_param = np.inf 1248 1249 DescribeResult = namedtuple( 1250 "DescribeResult", 1251 ("best_params", "best_score", "best_surrogate", "scores"), 1252 ) 1253 1254 if verbose == 2: 1255 print( 1256 f"\n adjusting surrogate model # {1} ({self.regressors[0][0]})... \n" 1257 ) 1258 1259 if self.n_jobs is None: # sequential optimization 1260 for i in range(len(self.regressors)): 1261 # try: 1262 1263 if verbose == 2: 1264 print( 1265 f"\n adjusting surrogate model # {i + 1} ({self.regressors[i][0]})... \n" 1266 ) 1267 1268 df_res["Model"][i + 1] = self.regressors[i][0] 1269 1270 gp_opt_obj = GPOpt( 1271 objective_func=self.objective_func, 1272 lower_bound=self.lower_bound, 1273 upper_bound=self.upper_bound, 1274 n_init=self.n_init, 1275 n_iter=self.n_iter, 1276 alpha=self.alpha, 1277 n_restarts_optimizer=self.n_restarts_optimizer, 1278 seed=self.seed, 1279 n_jobs=self.n_jobs, 1280 acquisition=self.acquisition, 1281 method=self.method, 1282 min_value=self.min_value, 1283 surrogate_obj=copy.deepcopy(self.regressors[i][1]), 1284 ) 1285 1286 gp_opt_obj.optimize( 1287 verbose=verbose, 1288 abs_tol=abs_tol, # suggested 1e-4, for n_iter = 200 1289 ucb_tol=ucb_tol, 1290 min_budget=min_budget, # minimum budget for early stopping 1291 func_args=func_args, 1292 ) 1293 1294 score_next_param = gp_opt_obj.y_min 1295 1296 df_res.iloc[i, 1] = score_next_param 1297 1298 if score_next_param < self.y_min: 1299 self.x_min = gp_opt_obj.x_min 1300 self.y_min = score_next_param 1301 self.best_surrogate = copy.deepcopy( 1302 gp_opt_obj.surrogate_obj 1303 ) 1304 if self.y_min == self.min_value: 1305 break 1306 1307 if verbose == 2: 1308 print(f"Global iteration #{i + 1} -----") 1309 print(f"current minimum: {self.x_min}") 1310 print(f"current minimum score: {self.y_min}") 1311 print( 1312 f"score for next parameter: {score_next_param} \n" 1313 ) 1314 1315 # except ValueError: 1316 1317 # continue 1318 1319 elif self.n_jobs >= 2 or self.n_jobs == -1: # parallel optimization 1320 1321 def foo(i): 1322 df_res["Model"][i + 1] = self.regressors[i][0] 1323 1324 gp_opt_obj = GPOpt( 1325 objective_func=self.objective_func, 1326 lower_bound=self.lower_bound, 1327 upper_bound=self.upper_bound, 1328 n_init=self.n_init, 1329 n_iter=self.n_iter, 1330 alpha=self.alpha, 1331 n_restarts_optimizer=self.n_restarts_optimizer, 1332 seed=self.seed, 1333 n_jobs=self.n_jobs, 1334 acquisition=self.acquisition, 1335 method=self.method, 1336 min_value=self.min_value, 1337 surrogate_obj=copy.deepcopy(self.regressors[i][1]), 1338 ) 1339 1340 try: 1341 gp_opt_obj.optimize( 1342 verbose=0, # important 1343 abs_tol=abs_tol, # suggested 1e-4, for n_iter = 200 1344 ucb_tol=ucb_tol, 1345 min_budget=min_budget, # minimum budget for early stopping 1346 func_args=func_args, 1347 ) 1348 1349 return gp_opt_obj 1350 except ValueError: 1351 return None 1352 1353 tmp_results = Parallel(n_jobs=self.n_jobs)( 1354 delayed(foo)(i) for i in tqdm(range(len(self.regressors))) 1355 ) 1356 1357 for i in tqdm(range(len(tmp_results))): 1358 if tmp_results[i] is not None: 1359 gp_opt_obj = copy.deepcopy(tmp_results[i]) 1360 1361 score_next_param = gp_opt_obj.y_min 1362 1363 df_res.iloc[i, 1] = score_next_param 1364 1365 if score_next_param < self.y_min: 1366 self.x_min = gp_opt_obj.x_min 1367 self.y_min = score_next_param 1368 self.best_surrogate = copy.deepcopy( 1369 gp_opt_obj.surrogate_obj 1370 ) 1371 if self.y_min == self.min_value: 1372 break 1373 1374 if verbose == 2: 1375 print(f"Global iteration #{i + 1} -----") 1376 print(f"current minimum: {self.x_min}") 1377 print(f"current minimum score: {self.y_min}") 1378 print( 1379 f"score for next parameter: {score_next_param} \n" 1380 ) 1381 1382 else: 1383 continue 1384 1385 else: 1386 raise ValueError( 1387 "n_jobs must be either None or >= 2 or equal to -1" 1388 ) 1389 return DescribeResult( 1390 self.x_min, 1391 self.y_min, 1392 self.best_surrogate, 1393 df_res.sort_values(by="Score"), 1394 ) 1395 1396 else: 1397 NotImplementedError( 1398 "type_exec must be either 'queue' or 'independent'" 1399 )
Class GPOpt.
Arguments:
lower_bound: a numpy array;
lower bound for researched minimum
upper_bound: a numpy array;
upper bound for researched minimum
objective_func: a function;
the objective function to be minimized
params_names: a list;
names of the parameters of the objective function (optional)
surrogate_obj: an object;
An ML model for estimating the uncertainty around the objective function
Must be nnetsauce.CustomRegressor or nnetsauce.PredictionInterval
x_init:
initial setting of points where `objective_func` is evaluated (optional)
y_init:
initial setting values at points where `objective_func` is evaluated (optional)
n_init: an integer;
number of points in the initial setting, when `x_init` and `y_init` are not provided
n_choices: an integer;
number of points for the calculation of expected improvement
n_iter: an integer;
number of iterations of the minimization algorithm
alpha: a float;
Value added to the diagonal of the kernel matrix during fitting (for Matern 5/2 kernel)
n_restarts_optimizer: an integer;
The number of restarts of the optimizer for finding the kernel’s parameters which maximize the log-marginal likelihood.
seed: an integer;
reproducibility seed
save: a string;
Specifies where to save the optimizer in its current state
n_jobs: an integer;
number of jobs for parallel computing on initial setting or method `lazyoptimize` (can be -1)
acquisition: a string;
acquisition function: "ei" (expected improvement) or "ucb" (upper confidence bound)
method: an str;
"bayesian" (default) for Gaussian posteriors, "mc" for Monte Carlo posteriors,
"splitconformal" (only for acquisition = "ucb") for conformalized surrogates
min_value: a float;
minimum value of the objective function (default is None). For example,
if objective function is accuracy, will be 1, and the algorithm will stop
per_second: a boolean;
__experimental__, default is False (leave to default for now)
log_scale: a boolean;
__experimental__, default is False (leave to default for now)
see also Bayesian Optimization with GPopt or Hyperparameters tuning with GPopt Agnostic BayesOpt
22class BOstopping: 23 """Bayesian Optimization with robust early stopping criteria.""" 24 25 def __init__( 26 self, 27 f, 28 bounds, 29 n_init=5, 30 kappa=1.96, 31 early_stopping=True, 32 stop_patience=20, 33 stop_threshold=0.01, 34 n_test_points=100, 35 alpha=1e-6, 36 n_restarts_optimizer=25, 37 seed=123, 38 ): 39 self.f = f 40 self.bounds = np.array(bounds) 41 self.n_init = n_init 42 self.kappa = kappa 43 self.early_stopping = early_stopping 44 self.stop_patience = stop_patience 45 self.stop_threshold = stop_threshold 46 self.n_test_points = n_test_points 47 self.alpha = alpha 48 self.n_restarts_optimizer = n_restarts_optimizer 49 self.seed = seed 50 51 np.random.seed(self.seed) 52 self.test_points = self._sample_random(n_test_points) 53 54 # History tracking 55 self.wasserstein_history = [] 56 self.X = [] 57 self.y = [] 58 self.best_values = [] 59 self.acquisition_values = [] 60 self.gp_variance = [] 61 self.phase = [] 62 63 # GP setup 64 self.gp = GaussianProcessRegressor( 65 kernel=Matern(nu=2.5), 66 alpha=self.alpha, 67 normalize_y=True, 68 n_restarts_optimizer=self.n_restarts_optimizer, 69 random_state=self.seed, 70 ) 71 72 def _sample_random(self, n_samples): 73 """Uniform sampling within bounds.""" 74 return np.random.uniform( 75 self.bounds[:, 0], 76 self.bounds[:, 1], 77 size=(n_samples, len(self.bounds)), 78 ) 79 80 def _acquisition(self, X_candidate): 81 """Expected Improvement acquisition function.""" 82 mu, sigma = self.gp.predict(X_candidate, return_std=True) 83 mu_sample = np.min(self.y) # Use actual observed minimum 84 85 sigma = np.maximum(sigma, 1e-6) 86 gamma = (mu_sample - mu) / sigma 87 ei = (mu_sample - mu) * norm.cdf(gamma) + sigma * norm.pdf(gamma) 88 return ei 89 90 def _get_posterior_samples(self, gp, n_samples=100): 91 """Sample from GP posterior at test points.""" 92 mu, sigma = gp.predict(self.test_points, return_std=True) 93 return np.random.normal( 94 mu, sigma, size=(n_samples, len(self.test_points)) 95 ) 96 97 def _compute_wasserstein(self, gp_prev, gp_current): 98 """Compute approximate Wasserstein distance between posteriors.""" 99 mu_prev, std_prev = gp_prev.predict(self.test_points, return_std=True) 100 mu_curr, std_curr = gp_current.predict( 101 self.test_points, return_std=True 102 ) 103 104 # 2-Wasserstein distance for independent 1D Gaussians, averaged 105 w2_per_point = (mu_prev - mu_curr) ** 2 + (std_prev - std_curr) ** 2 106 return np.sqrt(np.mean(w2_per_point)) 107 108 def _should_stop(self, gp_prev): 109 """Early stopping based on improvement and posterior stability.""" 110 if len(self.best_values) < self.stop_patience + 1: 111 return False 112 113 # 1. Improvement check 114 recent_improvements = np.diff(self.best_values[-self.stop_patience :]) 115 improvement_stop = np.all( 116 np.abs(recent_improvements) < self.stop_threshold 117 ) 118 119 # 2. Posterior stability 120 current_w = self._compute_wasserstein(gp_prev, self.gp) 121 self.wasserstein_history.append(current_w) 122 123 if len(self.wasserstein_history) >= 2 * self.stop_patience: 124 recent_w = self.wasserstein_history[-self.stop_patience :] 125 older_w = self.wasserstein_history[ 126 -2 * self.stop_patience : -self.stop_patience 127 ] 128 _, p_value = mannwhitneyu(recent_w, older_w, alternative="greater") 129 mwu_stable = p_value > 0.1 130 var_stable = np.var(recent_w) < 1e-6 131 posterior_stable = mwu_stable or var_stable 132 else: 133 posterior_stable = False 134 135 return improvement_stop or posterior_stable 136 137 def optimize(self, n_iter=100): 138 """Run Bayesian optimization loop.""" 139 print("Starting Initial Design Phase...") 140 self.X = self._sample_random(self.n_init) 141 self.y = [] 142 143 for i, x in enumerate(self.X): 144 y_val = self.f(x) 145 self.y.append(y_val) 146 current_best = np.min(self.y) 147 self.best_values.append(current_best) 148 self.acquisition_values.append(0) 149 self.gp_variance.append(0) 150 self.phase.append("initial") 151 print( 152 f" Initial sample {i+1}/{self.n_init}: f(x) = {y_val:.6f}, best = {current_best:.6f}" 153 ) 154 155 print(f"Initial Design Complete. Best value: {np.min(self.y):.6f}") 156 print("\nStarting Bayesian Optimization Phase...") 157 158 gp_prev = None 159 for i in tqdm(range(n_iter), desc="Bayesian Optimization"): 160 if i > 0: 161 gp_prev = deepcopy(self.gp) 162 163 self.gp.fit(self.X, self.y) 164 165 X_candidate = self._sample_random(1000) 166 acq = self._acquisition(X_candidate) 167 best_acq_idx = np.argmax(acq) 168 x_next = X_candidate[best_acq_idx] 169 max_acq_value = acq[best_acq_idx] 170 171 _, gp_std = self.gp.predict([x_next], return_std=True) 172 173 y_next = self.f(x_next) 174 self.X = np.vstack((self.X, x_next)) 175 self.y.append(y_next) 176 current_best = np.min(self.y) 177 self.best_values.append(current_best) 178 self.acquisition_values.append(max_acq_value) 179 self.gp_variance.append(gp_std[0]) 180 self.phase.append("bayesian") 181 182 print( 183 f" Iteration {i+1}: f(x) = {y_next:.6f}, best = {current_best:.6f}, " 184 f"EI = {max_acq_value:.6f}, σ = {gp_std[0]:.6f}" 185 ) 186 187 if gp_prev is not None: 188 w_dist = self._compute_wasserstein(gp_prev, self.gp) 189 self.wasserstein_history.append(w_dist) 190 print(f" Wasserstein distance: {w_dist:.8f}") 191 192 if self.early_stopping and i > self.n_init and gp_prev is not None: 193 if self._should_stop(gp_prev): 194 print(f"Early stopping at iteration {i+1}") 195 break 196 197 best_idx = np.argmin(self.y) 198 return self.X[best_idx], self.y[best_idx]
Bayesian Optimization with robust early stopping criteria.
9class MLOptimizer: 10 """ 11 A class for hyperparameter optimization of any ML model using Gaussian Process optimization. 12 """ 13 14 def __init__( 15 self, 16 scoring="accuracy", 17 cv=5, 18 n_jobs=None, 19 n_init=10, 20 n_iter=190, 21 seed=3137, 22 ): 23 """ 24 Initialize the hyperparameter optimizer. 25 26 Parameters: 27 ----------- 28 scoring : str or callable 29 Scoring metric for cross-validation 30 cv : int 31 Number of cross-validation folds 32 n_jobs : int 33 Number of parallel jobs 34 n_init : int 35 Number of initial random evaluations 36 n_iter : int 37 Number of optimization iterations 38 seed : int 39 Random seed 40 """ 41 self.scoring = scoring 42 self.cv = cv 43 self.n_jobs = n_jobs 44 self.n_init = n_init 45 self.n_iter = n_iter 46 self.seed = seed 47 48 # Store optimization state 49 self.optimization_result = None 50 self.estimator_class = None 51 self.param_config = None 52 self.X_train = None 53 self.y_train = None 54 55 def _generic_cv(self, estimator_class, param_dict): 56 """ 57 Generic cross-validation function for any ML model. 58 59 Parameters: 60 ----------- 61 estimator_class : class 62 The ML model class (e.g., RandomForestClassifier, SVR, etc.) 63 param_dict : dict 64 Dictionary of parameters to pass to the estimator 65 66 Returns: 67 -------- 68 float : negative mean CV score (for minimization) 69 """ 70 try: 71 estimator = estimator_class.set_params(**param_dict) 72 scores = cross_val_score( 73 estimator, 74 self.X_train, 75 self.y_train, 76 scoring=self.scoring, 77 cv=self.cv, 78 n_jobs=self.n_jobs, 79 error_score="raise", # This will raise an error if CV fails 80 ) 81 return -scores.mean() 82 except Exception as e: 83 # Return a large positive value if CV fails 84 print(f"CV failed with parameters {param_dict}: {e}") 85 return 1e6 # Large positive value for minimization 86 87 def optimize( 88 self, X_train, y_train, estimator_class, param_config, verbose=2 89 ): 90 """ 91 Optimize hyperparameters for an ML model. 92 93 Parameters: 94 ----------- 95 X_train : array-like 96 Training features 97 y_train : array-like 98 Training targets 99 estimator_class : class 100 The ML model class (not an instance!) 101 param_config : dict 102 Configuration for each parameter 103 verbose : int 104 Verbosity level 105 106 Returns: 107 -------- 108 GPOpt result object 109 """ 110 # Store optimization context 111 self.X_train = X_train 112 self.y_train = y_train 113 self.estimator_class = estimator_class 114 self.param_config = param_config 115 116 # Extract parameter names, bounds, and transformations 117 param_names = list(param_config.keys()) 118 lower_bounds = [] 119 upper_bounds = [] 120 transforms = {} 121 dtypes = {} 122 defaults = {} 123 124 for param_name, config in param_config.items(): 125 lower_bounds.append(config["bounds"][0]) 126 upper_bounds.append(config["bounds"][1]) 127 if "transform" in config: 128 transforms[param_name] = config["transform"] 129 dtypes[param_name] = config.get("dtype", "float") 130 if "default" in config: 131 defaults[param_name] = config["default"] 132 133 def crossval_objective(x): 134 """Objective function for hyperparameter tuning""" 135 param_dict = {} 136 137 for i, param_name in enumerate(param_names): 138 value = x[i] 139 140 # Apply transformation if specified 141 if param_name in transforms: 142 value = transforms[param_name](value) 143 144 # Apply dtype conversion 145 if dtypes[param_name] == "int": 146 value = int( 147 np.round(value) 148 ) # Use round instead of ceil for better behavior 149 150 param_dict[param_name] = value 151 152 # Add any default parameters not being optimized 153 for param_name, default_value in defaults.items(): 154 if param_name not in param_dict: 155 param_dict[param_name] = default_value 156 157 # Remove None values as they can cause issues 158 param_dict = {k: v for k, v in param_dict.items() if v is not None} 159 160 return self._generic_cv(estimator_class, param_dict) 161 162 # Set up Gaussian Process optimization 163 gp_opt = GPOpt( 164 objective_func=crossval_objective, 165 lower_bound=np.array(lower_bounds), 166 upper_bound=np.array(upper_bounds), 167 params_names=param_names, 168 n_init=self.n_init, 169 n_iter=self.n_iter, 170 seed=self.seed, 171 ) 172 173 try: 174 self.optimization_result = gp_opt.optimize( 175 verbose=verbose, abs_tol=1e-4 176 ) 177 return self.optimization_result 178 except Exception as e: 179 print(f"Optimization failed: {e}") 180 # Try to debug by running a few evaluations manually 181 print( 182 "Debugging: Testing objective function with initial points..." 183 ) 184 for i in range(min(3, self.n_init)): 185 test_point = np.random.uniform(lower_bounds, upper_bounds) 186 score = crossval_objective(test_point) 187 print(f"Test point {i}: {test_point} -> score: {score}") 188 raise 189 190 def get_best_parameters(self, apply_transforms=True): 191 """ 192 Get the best parameters found during optimization. 193 """ 194 if self.optimization_result is None: 195 raise ValueError( 196 "No optimization has been run yet. Call optimize() first." 197 ) 198 199 if apply_transforms: 200 return self.optimization_result.best_params 201 else: 202 return self.optimization_result.x 203 204 def get_best_score(self): 205 """ 206 Get the best score found during optimization. 207 """ 208 if self.optimization_result is None: 209 raise ValueError( 210 "No optimization has been run yet. Call optimize() first." 211 ) 212 213 return ( 214 -self.optimization_result.best_score 215 ) # Convert back to positive score 216 217 def create_optimized_estimator(self): 218 """ 219 Create an estimator instance with the optimized parameters. 220 """ 221 if self.optimization_result is None: 222 raise ValueError( 223 "No optimization has been run yet. Call optimize() first." 224 ) 225 226 if self.estimator_class is None: 227 raise ValueError( 228 "No estimator class stored. Call optimize() first." 229 ) 230 231 if self.param_config is None: 232 raise ValueError( 233 "No parameter configuration stored. Call optimize() first." 234 ) 235 236 best_params = self.get_best_parameters() 237 238 # Apply transformations and type conversions using stored param_config 239 final_params = {} 240 for param_name, value in best_params.items(): 241 if param_name in self.param_config: 242 config = self.param_config[param_name] 243 244 # Apply transformation if specified 245 if "transform" in config: 246 value = config["transform"](value) 247 248 # Apply dtype conversion 249 if config.get("dtype") == "int": 250 value = int(np.round(value)) 251 252 final_params[param_name] = value 253 254 # Remove None values 255 final_params = {k: v for k, v in final_params.items() if v is not None} 256 257 return self.estimator_class.set_params(**final_params) 258 259 def fit_optimized_estimator(self): 260 """ 261 Create and fit an optimized estimator on the training data. 262 """ 263 if self.X_train is None or self.y_train is None: 264 raise ValueError("No training data stored. Call optimize() first.") 265 266 estimator = self.create_optimized_estimator() 267 return estimator.fit(self.X_train, self.y_train)
A class for hyperparameter optimization of any ML model using Gaussian Process optimization.
156class GeneralizationOpt: 157 """ 158 Generic framework for model generalization diagnostics 159 160 This class is agnostic to: 161 - The base model being tuned (LightGBM, XGBoost, neural nets, etc.) 162 - The surrogate model used (GP, RF, neural nets, etc.) 163 - The hyperparameter space structure 164 165 Example usage: 166 -------------- 167 >>> from sklearn.gaussian_process import GaussianProcessRegressor 168 >>> from sklearn.ensemble import RandomForestRegressor 169 >>> 170 >>> # Define hyperparameter space 171 >>> hyperparams = { 172 >>> 'learning_rate': (0.01, 0.3, True), # (min, max, log_scale) 173 >>> 'max_depth': (3, 10, False) 174 >>> } 175 >>> 176 >>> # Initialize framework 177 >>> diag = GeneralizationOpt(hyperparams, random_state=42) 178 >>> 179 >>> # Use GP surrogate with Bayesian uncertainty 180 >>> gp = GaussianProcessRegressor() 181 >>> surrogate = GenericSurrogate(gp, conformal=False, bayesian=True) 182 >>> 183 >>> # Or use RF with conformal prediction 184 >>> rf = RandomForestRegressor() 185 >>> surrogate = GenericSurrogate(rf, conformal=True, bayesian=False) 186 >>> 187 >>> # Run diagnostics 188 >>> results = diag.run_diagnostics( 189 >>> X_train, X_test, y_train, y_test, 190 >>> model_class=LGBMClassifier, 191 >>> surrogate=surrogate, 192 >>> n_samples=100 193 >>> ) 194 """ 195 196 def __init__(self, hyperparams, random_state=42): 197 """ 198 Initialize diagnostics framework 199 200 Parameters: 201 ----------- 202 hyperparams : dict 203 Hyperparameter space definition 204 Format: {param_name: (min_val, max_val, log_scale), ...} 205 random_state : int 206 Random seed for reproducibility 207 """ 208 self.hyperparams = hyperparams 209 self.hyperparam_names = list(hyperparams.keys()) 210 self.n_dims = len(self.hyperparam_names) 211 self.random_state = random_state 212 213 # Storage 214 self.sobol_samples = None 215 self.evaluation_results = None 216 self.surrogate = None 217 self.sobol_indices = None 218 219 def _denormalize_config(self, normalized_config): 220 """Convert normalized [0,1] to actual hyperparameter values""" 221 config = {} 222 for i, param_name in enumerate(self.hyperparam_names): 223 min_val, max_val, log_scale = self.hyperparams[param_name] 224 norm_val = normalized_config[i] 225 226 if log_scale: 227 log_min, log_max = np.log(min_val), np.log(max_val) 228 config[param_name] = np.exp( 229 log_min + norm_val * (log_max - log_min) 230 ) 231 else: 232 denormalized_value = min_val + norm_val * (max_val - min_val) 233 # Special handling for boolean parameters represented as (0, 1, False) 234 if ( 235 min_val == 0 236 and max_val == 1 237 and not log_scale 238 and isinstance(min_val, int) 239 ): 240 config[param_name] = bool(round(denormalized_value)) 241 # Convert to int if needed (if min and max are integers and not log-scale) 242 elif ( 243 isinstance(min_val, int) 244 and isinstance(max_val, int) 245 and not log_scale 246 ): 247 config[param_name] = int(round(denormalized_value)) 248 else: 249 config[param_name] = denormalized_value 250 251 return config 252 253 def generate_sobol_samples(self, n_samples=None): 254 """ 255 Generate Sobol quasi-random samples in hyperparameter space 256 257 Parameters: 258 ----------- 259 n_samples : int, optional 260 Number of samples (default: 75 * n_dims) 261 262 Returns: 263 -------- 264 samples : ndarray, shape (n_samples, n_dims) 265 Normalized samples in [0, 1]^n_dims 266 """ 267 if n_samples is None: 268 n_samples = 75 * self.n_dims 269 270 sobol = Sobol(d=self.n_dims, scramble=True, seed=self.random_state) 271 self.sobol_samples = sobol.random(n_samples) 272 273 return self.sobol_samples 274 275 def evaluate_configurations( 276 self, 277 X_train, 278 X_test, 279 y_train, 280 y_test, 281 model_class, 282 model_kwargs=None, 283 evaluation_fn=None, 284 n_folds=5, 285 ): 286 """ 287 Evaluate model configurations and compute generalization gaps 288 289 Parameters: 290 ----------- 291 X_train, X_test, y_train, y_test : arrays 292 Train/test data 293 model_class : class 294 Model class to instantiate (e.g., LGBMClassifier) 295 model_kwargs : dict, optional 296 Fixed kwargs for model (e.g., {'random_state': 42, 'verbose': -1}) 297 evaluation_fn : callable, optional 298 Function to evaluate model: evaluation_fn(y_true, y_pred) -> score 299 Default: balanced_accuracy_score for classification 300 n_folds : int 301 Number of CV folds for training score estimation 302 303 Returns: 304 -------- 305 results : DataFrame 306 Results with columns: hyperparams, cv_train, test_score, gap_abs, etc. 307 """ 308 from sklearn.model_selection import StratifiedKFold, KFold 309 from sklearn.metrics import balanced_accuracy_score, r2_score 310 311 if self.sobol_samples is None: 312 raise ValueError("Must generate Sobol samples first") 313 314 if model_kwargs is None: 315 model_kwargs = {} 316 317 if evaluation_fn is None: 318 # Heuristic: check if classification or regression 319 if len(np.unique(y_train)) < 20: 320 evaluation_fn = balanced_accuracy_score 321 kfold = StratifiedKFold( 322 n_splits=n_folds, 323 shuffle=True, 324 random_state=self.random_state, 325 ) 326 else: 327 evaluation_fn = r2_score 328 kfold = KFold( 329 n_splits=n_folds, 330 shuffle=True, 331 random_state=self.random_state, 332 ) 333 else: 334 kfold = KFold( 335 n_splits=n_folds, shuffle=True, random_state=self.random_state 336 ) 337 338 results = [] 339 n_samples = len(self.sobol_samples) 340 341 print( 342 f"Evaluating {n_samples} configurations ({n_folds}-fold CV + test)..." 343 ) 344 345 for idx, norm_config in tqdm(enumerate(self.sobol_samples)): 346 if (idx + 1) % 50 == 0: 347 print( 348 f" Progress: {idx + 1}/{n_samples} " 349 f"({100*(idx+1)/n_samples:.1f}%)" 350 ) 351 352 config = self._denormalize_config(norm_config) 353 354 # Cross-validation score 355 cv_scores = [] 356 for train_idx, val_idx in kfold.split(X_train, y_train): 357 if hasattr(X_train, "iloc"): # DataFrame 358 X_tr, X_val = X_train.iloc[train_idx], X_train.iloc[val_idx] 359 y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx] 360 else: # ndarray 361 X_tr, X_val = X_train[train_idx], X_train[val_idx] 362 y_tr, y_val = y_train[train_idx], y_train[val_idx] 363 364 model = model_class(**config, **model_kwargs) 365 model.fit(X_tr, y_tr) 366 y_pred = model.predict(X_val) 367 cv_scores.append(evaluation_fn(y_val, y_pred)) 368 369 cv_train = np.mean(cv_scores) 370 371 # Test score 372 model = model_class(**config, **model_kwargs) 373 model.fit(X_train, y_train) 374 y_pred_test = model.predict(X_test) 375 test_score = evaluation_fn(y_test, y_pred_test) 376 377 # Generalization gaps 378 gap_abs = cv_train - test_score 379 gap_norm = ( 380 (cv_train - test_score) / (1 - cv_train) if cv_train < 1 else 0 381 ) 382 gap_mpe = ( 383 ((cv_train - test_score) / cv_train * 100) 384 if cv_train > 0 385 else 0 386 ) 387 388 results.append( 389 { 390 **config, 391 "cv_train": cv_train, 392 "test_score": test_score, 393 "gap_abs": gap_abs, 394 "gap_norm": gap_norm, 395 "gap_mpe": gap_mpe, 396 } 397 ) 398 399 self.evaluation_results = pd.DataFrame(results) 400 401 print(f"\n✓ Completed {n_samples} evaluations") 402 print( 403 f" CV Train Score: {self.evaluation_results['cv_train'].mean():.4f} \u00b1 " 404 f"{self.evaluation_results['cv_train'].std():.4f}" 405 ) 406 print( 407 f" Test Score: {self.evaluation_results['test_score'].mean():.4f} \u00b1 " 408 f"{self.evaluation_results['test_score'].std():.4f}" 409 ) 410 print( 411 f" Absolute Gap: {self.evaluation_results['gap_abs'].mean():.4f} \u00b1 " 412 f"{self.evaluation_results['gap_abs'].std():.4f}" 413 ) 414 415 return self.evaluation_results 416 417 def fit_surrogate(self, surrogate, target_gap="gap_abs"): 418 """ 419 Fit surrogate model to generalization gap data 420 421 Parameters: 422 ----------- 423 surrogate : GenericSurrogate 424 Surrogate model instance 425 target_gap : str 426 Target variable to model ('gap_abs', 'gap_norm', 'gap_mpe') 427 428 Returns: 429 -------- 430 surrogate : GenericSurrogate 431 Fitted surrogate model 432 metrics : dict 433 Performance metrics (R², MAE, coverage if applicable) 434 """ 435 if self.evaluation_results is None: 436 raise ValueError("Must evaluate configurations first") 437 438 X = self.sobol_samples 439 y = self.evaluation_results[target_gap].values 440 441 # Train/validation split 442 split_idx = int(0.8 * len(X)) 443 X_train, X_val = X[:split_idx], X[split_idx:] 444 y_train, y_val = y[:split_idx], y[split_idx:] 445 446 print( 447 f"Training surrogate on {len(X_train)} samples, " 448 f"validating on {len(X_val)} samples..." 449 ) 450 451 # Fit surrogate 452 surrogate.fit(X_train, y_train) 453 454 # Evaluate 455 y_pred, y_lower, y_upper = surrogate.predict(X_val, return_std=True) 456 457 from sklearn.metrics import r2_score, mean_absolute_error 458 459 r2 = r2_score(y_val, y_pred) 460 mae = mean_absolute_error(y_val, y_pred) 461 462 metrics = {"r2": r2, "mae": mae} 463 464 # Compute coverage if intervals available 465 if not np.array_equal(y_lower, y_upper): 466 coverage = np.mean((y_val >= y_lower) & (y_val <= y_upper)) 467 interval_width = np.mean(y_upper - y_lower) 468 metrics["coverage"] = coverage 469 metrics["interval_width"] = interval_width 470 471 print(f"\n✓ Surrogate Performance:") 472 print(f" R² Score: {r2:.4f}") 473 print(f" MAE: {mae:.6f}") 474 print(f" Coverage: {coverage:.2%}") 475 print(f" Mean Interval Width: {interval_width:.6f}") 476 else: 477 print(f"\n✓ Surrogate Performance:") 478 print(f" R² Score: {r2:.4f}") 479 print(f" MAE: {mae:.6f}") 480 481 self.surrogate = surrogate 482 return surrogate, metrics 483 484 def compute_sobol_indices(self, n_samples=10000): 485 """ 486 Compute Sobol sensitivity indices for global sensitivity analysis 487 488 Parameters: 489 ----------- 490 n_samples : int 491 Number of Monte Carlo samples for Sobol estimation 492 493 Returns: 494 -------- 495 indices : DataFrame 496 Sobol indices with columns: Parameter, S1, ST, Interaction 497 """ 498 if self.surrogate is None: 499 raise ValueError("Must fit surrogate first") 500 501 problem = { 502 "num_vars": self.n_dims, 503 "names": self.hyperparam_names, 504 "bounds": [[0, 1]] * self.n_dims, 505 } 506 507 param_values = sobol_sample.sample( 508 problem, n_samples, calc_second_order=False 509 ) 510 Y = self.surrogate.predict(param_values) 511 512 Si = sobol_analyze.analyze( 513 problem, Y, calc_second_order=False, print_to_console=False 514 ) 515 516 self.sobol_indices = pd.DataFrame( 517 { 518 "Parameter": self.hyperparam_names, 519 "S1": Si["S1"], 520 "ST": Si["ST"], 521 "Interaction": Si["ST"] - Si["S1"], 522 } 523 ).sort_values("S1", ascending=False) 524 525 print("\n" + "=" * 70) 526 print("Sobol Sensitivity Indices (Variance Decomposition)") 527 print("=" * 70) 528 print(self.sobol_indices.to_string(index=False)) 529 print("\nS1 = First-order effect (main effect)") 530 print("ST = Total-order effect (main + interactions)") 531 print("Interaction = ST - S1 (pure interaction effects)") 532 533 return self.sobol_indices 534 535 def optimize_acquisition( 536 self, 537 acquisition="ei", 538 n_starts=10, 539 kappa=2.0, 540 xi=0.01, 541 minimize_gap=True, 542 ): 543 """ 544 Optimize acquisition function to find best hyperparameter configuration 545 546 Parameters: 547 ----------- 548 acquisition : str 549 Acquisition function: 'ei' (Expected Improvement), 550 'ucb' (Upper Confidence Bound), 'pi' (Probability of Improvement) 551 n_starts : int 552 Number of random restarts for multi-start optimization 553 kappa : float 554 Exploration parameter for UCB (higher = more exploration) 555 xi : float 556 Exploration parameter for EI/PI (higher = more exploration) 557 minimize_gap : bool 558 If True, minimize gap; if False, maximize test performance 559 560 Returns: 561 -------- 562 results : dict 563 Optimization results including best config and expected improvement 564 """ 565 if self.surrogate is None: 566 raise ValueError("Must fit surrogate first") 567 568 # Get current best 569 if minimize_gap: 570 y_best = self.evaluation_results["gap_abs"].min() 571 print(f"Current best gap: {y_best:.6f}") 572 else: 573 y_best = self.evaluation_results["test_score"].max() 574 print(f"Current best test score: {y_best:.6f}") 575 576 # Define acquisition functions 577 def expected_improvement(x): 578 x = x.reshape(1, -1) 579 y_pred, y_lower, y_upper = self.surrogate.predict( 580 x, return_std=True 581 ) 582 mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4 # Approx std 583 584 if sigma < 1e-10: 585 return 0.0 586 587 if minimize_gap: 588 imp = y_best - mu - xi 589 else: 590 imp = mu - y_best - xi 591 592 Z = imp / sigma 593 ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z) 594 return -ei 595 596 def upper_confidence_bound(x): 597 x = x.reshape(1, -1) 598 y_pred, y_lower, y_upper = self.surrogate.predict( 599 x, return_std=True 600 ) 601 mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4 602 603 if minimize_gap: 604 ucb = mu - kappa * sigma 605 else: 606 ucb = mu + kappa * sigma 607 608 return ucb if minimize_gap else -ucb 609 610 def probability_of_improvement(x): 611 x = x.reshape(1, -1) 612 y_pred, y_lower, y_upper = self.surrogate.predict( 613 x, return_std=True 614 ) 615 mu, sigma = y_pred[0], (y_upper[0] - y_lower[0]) / 4 616 617 if sigma < 1e-10: 618 return 0.0 619 620 if minimize_gap: 621 Z = (y_best - mu - xi) / sigma 622 else: 623 Z = (mu - y_best - xi) / sigma 624 625 pi = norm.cdf(Z) 626 return -pi 627 628 acq_funcs = { 629 "ei": expected_improvement, 630 "ucb": upper_confidence_bound, 631 "pi": probability_of_improvement, 632 } 633 acq_func = acq_funcs[acquisition] 634 635 # Multi-start optimization 636 print(f"\nRunning {n_starts}-start L-BFGS-B optimization...") 637 638 best_result = None 639 best_acq_value = np.inf 640 641 np.random.seed(self.random_state) 642 starting_points = np.random.rand(n_starts, self.n_dims) 643 644 # Add best observed point as starting point 645 if len(self.sobol_samples) > 0: 646 best_idx = ( 647 self.evaluation_results["gap_abs"].idxmin() 648 if minimize_gap 649 else self.evaluation_results["test_score"].idxmax() 650 ) 651 starting_points[0] = self.sobol_samples[best_idx] 652 653 for i, x0 in enumerate(starting_points): 654 try: 655 result = scipy_minimize( 656 acq_func, 657 x0, 658 method="L-BFGS-B", 659 bounds=[(0, 1)] * self.n_dims, 660 options={"maxiter": 1000, "ftol": 1e-9}, 661 ) 662 663 if result.success and result.fun < best_acq_value: 664 best_acq_value = result.fun 665 best_result = result 666 except Exception as e: 667 print(f" Warning: Optimization {i+1} failed: {str(e)}") 668 669 if best_result is None: 670 raise RuntimeError("All optimization attempts failed") 671 672 # Extract best configuration 673 best_x_normalized = best_result.x 674 best_config = self._denormalize_config(best_x_normalized) 675 676 # Predict gap at optimum 677 y_pred, y_lower, y_upper = self.surrogate.predict( 678 best_x_normalized.reshape(1, -1), return_std=True 679 ) 680 mu_best = y_pred[0] 681 682 print(f"\n✓ Optimization Complete") 683 print(f" Acquisition: {acquisition.upper()}") 684 print(f" Best acquisition value: {-best_acq_value:.6f}") 685 print(f" Predicted gap at optimum: {mu_best:.6f}") 686 print(f" 95% CI: [{y_lower[0]:.6f}, {y_upper[0]:.6f}]") 687 print(f"\n Recommended Configuration:") 688 for param, value in best_config.items(): 689 print( 690 f" {param}: {value if isinstance(value, int) else f'{value:.4f}'}" 691 ) 692 693 improvement = y_best - mu_best if minimize_gap else mu_best - y_best 694 print(f"\n Expected improvement: {improvement:.6f}") 695 696 return { 697 "best_config": best_config, 698 "best_config_normalized": best_x_normalized, 699 "predicted_gap_mean": mu_best, 700 "predicted_gap_lower": y_lower[0], 701 "predicted_gap_upper": y_upper[0], 702 "current_best": y_best, 703 "expected_improvement": improvement, 704 "acquisition_function": acquisition, 705 } 706 707 def run_diagnostics( 708 self, 709 X_train, 710 X_test, 711 y_train, 712 y_test, 713 model_class, 714 surrogate, 715 n_samples=None, 716 model_kwargs=None, 717 target_gap="gap_abs", 718 ): 719 """ 720 Run complete diagnostic pipeline 721 722 Parameters: 723 ----------- 724 X_train, X_test, y_train, y_test : arrays 725 Train/test data 726 model_class : class 727 Model class to tune 728 surrogate : GenericSurrogate 729 Surrogate model for gap modeling 730 n_samples : int, optional 731 Number of Sobol samples 732 model_kwargs : dict, optional 733 Fixed model kwargs 734 target_gap : str 735 Gap metric to model 736 737 Returns: 738 -------- 739 results : dict 740 Complete diagnostic results 741 """ 742 # Phase 1: Exploration 743 self.generate_sobol_samples(n_samples) 744 eval_results = self.evaluate_configurations( 745 X_train, X_test, y_train, y_test, model_class, model_kwargs 746 ) 747 748 # Phase 2: Surrogate fitting 749 surrogate, metrics = self.fit_surrogate(surrogate, target_gap) 750 751 # Phase 3: Analysis 752 sobol_indices = self.compute_sobol_indices() 753 754 # Phase 4: Optimization 755 opt_results = self.optimize_acquisition() 756 757 return { 758 "evaluation_results": eval_results, 759 "surrogate_metrics": metrics, 760 "sobol_indices": sobol_indices, 761 "optimization": opt_results, 762 }
Generic framework for model generalization diagnostics
This class is agnostic to:
- The base model being tuned (LightGBM, XGBoost, neural nets, etc.)
- The surrogate model used (GP, RF, neural nets, etc.)
- The hyperparameter space structure
Example usage:
>>> from sklearn.gaussian_process import GaussianProcessRegressor
>>> from sklearn.ensemble import RandomForestRegressor
>>>
>>> # Define hyperparameter space
>>> hyperparams = {
>>> 'learning_rate': (0.01, 0.3, True), # (min, max, log_scale)
>>> 'max_depth': (3, 10, False)
>>> }
>>>
>>> # Initialize framework
>>> diag = GeneralizationOpt(hyperparams, random_state=42)
>>>
>>> # Use GP surrogate with Bayesian uncertainty
>>> gp = GaussianProcessRegressor()
>>> surrogate = GenericSurrogate(gp, conformal=False, bayesian=True)
>>>
>>> # Or use RF with conformal prediction
>>> rf = RandomForestRegressor()
>>> surrogate = GenericSurrogate(rf, conformal=True, bayesian=False)
>>>
>>> # Run diagnostics
>>> results = diag.run_diagnostics(
>>> X_train, X_test, y_train, y_test,
>>> model_class=LGBMClassifier,
>>> surrogate=surrogate,
>>> n_samples=100
>>> )
17class GenericSurrogate(BaseEstimator, RegressorMixin): 18 """ 19 Generic wrapper for any surrogate model with conformal prediction support 20 21 This class wraps any sklearn-compatible model and adds: 22 - Conformal prediction intervals (distribution-free uncertainty) 23 - Optional Bayesian uncertainty (if model supports it) 24 - Unified prediction interface 25 26 Parameters: 27 ----------- 28 base_model : object 29 Any sklearn-compatible model with fit() and predict() methods 30 conformal : bool 31 Whether to use conformal prediction for uncertainty 32 bayesian : bool 33 Whether model provides native uncertainty (e.g., GP, Bayesian NN) 34 """ 35 36 def __init__(self, base_model, conformal=True, bayesian=False): 37 self.base_model = base_model 38 self.conformal = conformal 39 self.bayesian = bayesian 40 self.nonconformity_scores_ = None 41 self.is_fitted_ = False 42 43 def fit(self, X, y, cal_ratio=0.2): 44 """ 45 Fit surrogate with optional conformal calibration 46 47 Parameters: 48 ----------- 49 X : array-like, shape (n_samples, n_features) 50 Training features 51 y : array-like, shape (n_samples,) 52 Training targets 53 cal_ratio : float 54 Proportion of data for conformal calibration (if conformal=True) 55 56 Returns: 57 -------- 58 self : object 59 """ 60 if self.conformal and cal_ratio > 0: 61 # Split conformal prediction: reserve data for calibration 62 n_cal = int(len(X) * cal_ratio) 63 X_train, X_cal = X[:-n_cal], X[-n_cal:] 64 y_train, y_cal = y[:-n_cal], y[-n_cal:] 65 66 self.base_model.fit(X_train, y_train) 67 68 # Compute nonconformity scores on calibration set 69 y_cal_pred = self.base_model.predict(X_cal) 70 self.nonconformity_scores_ = np.abs(y_cal - y_cal_pred) 71 else: 72 # No conformal prediction: use all data 73 self.base_model.fit(X, y) 74 75 self.is_fitted_ = True 76 return self 77 78 def predict(self, X, return_std=False, confidence=0.95): 79 """ 80 Unified prediction interface with optional uncertainty 81 82 Parameters: 83 ----------- 84 X : array-like, shape (n_samples, n_features) 85 Test features 86 return_std : bool 87 Whether to return uncertainty estimates 88 confidence : float 89 Confidence level for prediction intervals (if conformal=True) 90 91 Returns: 92 -------- 93 predictions : array or tuple 94 If return_std=False: predictions only 95 If return_std=True: (predictions, lower_bound, upper_bound) 96 97 Notes: 98 ------ 99 - Conformal: Distribution-free intervals with finite-sample guarantees 100 - Bayesian: Model-based uncertainty (e.g., GP posterior std) 101 """ 102 if not self.is_fitted_: 103 raise ValueError("Model must be fitted before prediction") 104 105 # Get point predictions 106 y_pred = self.base_model.predict(X) 107 108 if not return_std: 109 return y_pred 110 111 # Compute uncertainty estimates 112 if self.conformal and self.nonconformity_scores_ is not None: 113 # Conformal prediction intervals 114 alpha = 1 - confidence 115 n_scores = len(self.nonconformity_scores_) 116 117 # Calculate the effective quantile level based on (n+1) rule 118 # Cap the quantile level at 1.0 to prevent ValueError from np.quantile 119 quantile_level = min( 120 1.0, np.ceil((1 - alpha) * (n_scores + 1)) / n_scores 121 ) 122 123 q = np.quantile(self.nonconformity_scores_, quantile_level) 124 y_lower = y_pred - q 125 y_upper = y_pred + q 126 127 elif self.bayesian and hasattr(self.base_model, "predict"): 128 # Try to get Bayesian uncertainty from model 129 try: 130 # For GP-like models 131 if ( 132 hasattr(self.base_model, "predict") 133 and "return_std" 134 in self.base_model.predict.__code__.co_varnames 135 ): 136 _, y_std = self.base_model.predict(X, return_std=True) 137 y_lower = y_pred - 2 * y_std # 95% CI for Gaussian 138 y_upper = y_pred + 2 * y_std 139 else: 140 # Fallback: assume constant uncertainty 141 y_std = np.std(y_pred) * np.ones_like(y_pred) 142 y_lower = y_pred - 2 * y_std 143 y_upper = y_pred + 2 * y_std 144 except: 145 # No uncertainty available 146 y_lower = y_pred 147 y_upper = y_pred 148 else: 149 # No uncertainty quantification available 150 y_lower = y_pred 151 y_upper = y_pred 152 153 return y_pred, y_lower, y_upper
Generic wrapper for any surrogate model with conformal prediction support
This class wraps any sklearn-compatible model and adds:
- Conformal prediction intervals (distribution-free uncertainty)
- Optional Bayesian uncertainty (if model supports it)
- Unified prediction interface
Parameters:
base_model : object Any sklearn-compatible model with fit() and predict() methods conformal : bool Whether to use conformal prediction for uncertainty bayesian : bool Whether model provides native uncertainty (e.g., GP, Bayesian NN)
43 def fit(self, X, y, cal_ratio=0.2): 44 """ 45 Fit surrogate with optional conformal calibration 46 47 Parameters: 48 ----------- 49 X : array-like, shape (n_samples, n_features) 50 Training features 51 y : array-like, shape (n_samples,) 52 Training targets 53 cal_ratio : float 54 Proportion of data for conformal calibration (if conformal=True) 55 56 Returns: 57 -------- 58 self : object 59 """ 60 if self.conformal and cal_ratio > 0: 61 # Split conformal prediction: reserve data for calibration 62 n_cal = int(len(X) * cal_ratio) 63 X_train, X_cal = X[:-n_cal], X[-n_cal:] 64 y_train, y_cal = y[:-n_cal], y[-n_cal:] 65 66 self.base_model.fit(X_train, y_train) 67 68 # Compute nonconformity scores on calibration set 69 y_cal_pred = self.base_model.predict(X_cal) 70 self.nonconformity_scores_ = np.abs(y_cal - y_cal_pred) 71 else: 72 # No conformal prediction: use all data 73 self.base_model.fit(X, y) 74 75 self.is_fitted_ = True 76 return self
Fit surrogate with optional conformal calibration
Parameters:
X : array-like, shape (n_samples, n_features) Training features y : array-like, shape (n_samples,) Training targets cal_ratio : float Proportion of data for conformal calibration (if conformal=True)
Returns:
self : object
78 def predict(self, X, return_std=False, confidence=0.95): 79 """ 80 Unified prediction interface with optional uncertainty 81 82 Parameters: 83 ----------- 84 X : array-like, shape (n_samples, n_features) 85 Test features 86 return_std : bool 87 Whether to return uncertainty estimates 88 confidence : float 89 Confidence level for prediction intervals (if conformal=True) 90 91 Returns: 92 -------- 93 predictions : array or tuple 94 If return_std=False: predictions only 95 If return_std=True: (predictions, lower_bound, upper_bound) 96 97 Notes: 98 ------ 99 - Conformal: Distribution-free intervals with finite-sample guarantees 100 - Bayesian: Model-based uncertainty (e.g., GP posterior std) 101 """ 102 if not self.is_fitted_: 103 raise ValueError("Model must be fitted before prediction") 104 105 # Get point predictions 106 y_pred = self.base_model.predict(X) 107 108 if not return_std: 109 return y_pred 110 111 # Compute uncertainty estimates 112 if self.conformal and self.nonconformity_scores_ is not None: 113 # Conformal prediction intervals 114 alpha = 1 - confidence 115 n_scores = len(self.nonconformity_scores_) 116 117 # Calculate the effective quantile level based on (n+1) rule 118 # Cap the quantile level at 1.0 to prevent ValueError from np.quantile 119 quantile_level = min( 120 1.0, np.ceil((1 - alpha) * (n_scores + 1)) / n_scores 121 ) 122 123 q = np.quantile(self.nonconformity_scores_, quantile_level) 124 y_lower = y_pred - q 125 y_upper = y_pred + q 126 127 elif self.bayesian and hasattr(self.base_model, "predict"): 128 # Try to get Bayesian uncertainty from model 129 try: 130 # For GP-like models 131 if ( 132 hasattr(self.base_model, "predict") 133 and "return_std" 134 in self.base_model.predict.__code__.co_varnames 135 ): 136 _, y_std = self.base_model.predict(X, return_std=True) 137 y_lower = y_pred - 2 * y_std # 95% CI for Gaussian 138 y_upper = y_pred + 2 * y_std 139 else: 140 # Fallback: assume constant uncertainty 141 y_std = np.std(y_pred) * np.ones_like(y_pred) 142 y_lower = y_pred - 2 * y_std 143 y_upper = y_pred + 2 * y_std 144 except: 145 # No uncertainty available 146 y_lower = y_pred 147 y_upper = y_pred 148 else: 149 # No uncertainty quantification available 150 y_lower = y_pred 151 y_upper = y_pred 152 153 return y_pred, y_lower, y_upper
Unified prediction interface with optional uncertainty
Parameters:
X : array-like, shape (n_samples, n_features) Test features return_std : bool Whether to return uncertainty estimates confidence : float Confidence level for prediction intervals (if conformal=True)
Returns:
predictions : array or tuple If return_std=False: predictions only If return_std=True: (predictions, lower_bound, upper_bound)
Notes:
- Conformal: Distribution-free intervals with finite-sample guarantees
- Bayesian: Model-based uncertainty (e.g., GP posterior std)