Source code for conformalopt.main

import numpy as np
from tqdm import tqdm
from conformalopt.data import get_scores
import itertools
import warnings
from conformalopt.utils import *
from statsmodels.tsa.forecasting.theta import ThetaModel

METHOD_HPARAMS = {
    "quantile_tracker": ["lr", "p_order_qt", "bias"],
    "ar_quantile_loss_scorecaster": ["lr", "p_order_ar_scorecaster"],
    "theta_scorecaster": ["lr"],
}

DEFAULT_GRIDS = {
    "lr": [1e-5, 1e-4, 1e-3, 1e-2, 1e-1, 1e0, 1e1, 1e2, 1e3, 1e4, 1e5],
    "p_order_qt": [0, 1, 2],
    "p_order_ar_scorecaster": [1],
    "bias": [1e-1, 1e0, 5, 1e1, 200, 1e2, 1e3],
}


[docs] class ConformalPredictor: """ A class for implementing an online conformal predictor as in the paper Online Conformal Prediction via Online Optimization. Attributes: alpha (float): Target miscoverage level. If set to 0.1, for example, eventually 90% coverage will be achieved. lr_type (str): Type of learning rate, can be `fixed`, `decaying`, or `proportional`. The `decaying` learning rate is of the form \\Theta(t**{-0.6}). The `proportional` option multiplies the learning rate by the range of scores over the last 20 scores. quantile_track (str or None): Type of quantile tracker, can be `scalar` or `linear`. scorecaster (str or None): Scorecasting method, either `theta_scorecaster` or `ar_quantile_loss_scorecaster`, or None. hypers (dict): Specified hyperparameters for the conformal predictor. If none are passed in, all appropriate hyperparameters will be tuned. conformal_predictor_name (str): Name of the conformal predictor to be used in out/ folder during evaluation. If the empty string is passed in, a name will be constructed based on the conformal predictor's attributes. expt_name (str): Experiment name for evaluation. """ def __init__( self, alpha: float = 0.1, lr_type: str = "fixed", quantile_tracker="linear", scorecaster=None, hypers={}, conformal_predictor_name="", expt_name="expt", ): """ Initializes the ConformalPredictor with given hyperparameters. Parameters: alpha (float): Target miscoverage level. lr_type (str): Type of learning rate (`fixed`, `decaying`, `proportional`). quantile_tracker (str): Type of quantile tracker (`scalar`, `linear`). scorecaster (str): Scorecasting method (e.g., `theta_scorecaster`). hypers (dict): Hyperparameters to initialize the model. conformal_predictor_name (str): Name of the conformal predictor used in eval. expt_name (str): Experiment name for evaluation. """ assert quantile_tracker in [None, "scalar", "linear"] assert scorecaster in [None] + [key for key in METHOD_HPARAMS.keys() if "_scorecaster" in key] assert lr_type in ["fixed", "decaying", "proportional"] self.alpha = alpha self.quantile_track = quantile_tracker self.scorecaster = scorecaster self.lr_type = lr_type self.conformal_predictor_name = conformal_predictor_name self.expt_name = expt_name self.past_predictions = [] self.past_test_scores = [] self.quantile_tracker_parameter = None # Set up hyperparameters if self.quantile_track == "scalar": hypers.update({"p_order_qt": 0, "bias": 1}) self.hypers = hypers.copy() self.original_hypers = hypers.copy() for method, condition in [ ("quantile_tracker", self.quantile_track), (self.scorecaster, self.scorecaster is not None), ]: if condition: for hparam in METHOD_HPARAMS[method]: self.hypers.setdefault(hparam, None) def init_active_fields(self): if self.quantile_track is not None: p_order = self.hypers["p_order_qt"] self.quantile_tracker_parameter = np.ones(p_order + 1) if p_order > 0: self.quantile_tracker_parameter *= 1 / p_order self.past_predictions = [] self.past_test_scores = []
[docs] def get_predictions(self): """ Retrieves the past predictions made by the conformal predictor. Returns: list: The list of past predictions. """ return self.past_predictions
[docs] def set_hypers(self, hyper_dict): """ Sets the hyperparameters for the model. Parameters: hyper_dict (dict): A dictionary of hyperparameters to set. """ for hyper in hyper_dict.keys(): self.hypers[hyper] = hyper_dict[hyper]
[docs] def reset_hypers(self): """ Resets the hyperparameters to their original values. """ self.hypers = self.original_hypers.copy()
[docs] def fit(self, val_scores, tune_all_hparams=False, specific_hypers_to_tune: list = [], hyper_grid_overrides={}): """ Tunes the hyperparameters for the conformal predictor using a grid search on validation data. The hyperparameters leading to the best quantile loss, provided 1-alpha-0.01 coverage was achieved, are selected. If no selection achieves this coverage constraint, the coverage constraint is disregarded. Parameters: val_scores (list): The validation scores to use for tuning hyperparameters. tune_all_hparams (bool): Whether to tune all hyperparameters or only those currently unspecified. specific_hypers_to_tune (list): Specific hyperparameters to tune, if provided. hyper_grid_overrides (dict): New grids to tune hyperparameters on, beyond default grid. """ T_val = len(val_scores) hyper_grid = DEFAULT_GRIDS | hyper_grid_overrides # Determine which hypers to tune. hypers_to_tune = [hyper for hyper in self.hypers.keys() if self.hypers[hyper] == None] + specific_hypers_to_tune if tune_all_hparams: hypers_to_tune = self.hypers.keys() grid = [[(hyper_name, hyper_value) for hyper_value in hyper_grid[hyper_name]] for hyper_name in hypers_to_tune] hyper_choices = [{hyper: value for hyper, value in combination} for combination in itertools.product(*grid)] # Perform grid search. best_quantile_loss, best_hypers = np.inf, None cov_gap, coverage_achieved = 0.01, False best_quantile_loss_coverage_constrained, best_hypers_coverage_constrained = ( np.inf, None, ) for hypers in tqdm(hyper_choices): self.hypers = self.original_hypers | hypers self.init_active_fields() for t in range(T_val): prediction = self.predict() self.step(prediction, val_scores[t]) curr_quantile_loss = quantile_loss(val_scores, self.past_predictions, 1 - self.alpha) curr_run_coverage_achieved = ( np.abs(np.mean(np.array(self.past_predictions) >= val_scores) - (1 - self.alpha)) <= cov_gap ) coverage_achieved |= curr_run_coverage_achieved if curr_quantile_loss < best_quantile_loss: best_quantile_loss = curr_quantile_loss best_hypers = self.hypers if curr_run_coverage_achieved and curr_quantile_loss < best_quantile_loss_coverage_constrained: best_quantile_loss_coverage_constrained = curr_quantile_loss best_hypers_coverage_constrained = self.hypers self.hypers = best_hypers_coverage_constrained if curr_run_coverage_achieved else best_hypers print(f"Tuned hyperparameters: {self.hypers}") self.init_active_fields() for key, value in self.hypers.items(): if key in ["lr", "bias"] and (value == hyper_grid[key][0] or value == hyper_grid[key][-1]): warnings.warn(f"{key}:{value} is at the edge of hyperparameter grid.")
def quantile_tracker_predict(self): return self.quantile_tracker_parameter @ self.get_covariate() def scorecaster_predict(self): training_scores = np.array(self.past_test_scores[-200:]) if len(training_scores) >= 10: if self.scorecaster == "theta_scorecaster": model = ThetaModel( training_scores.astype(float), period=1, ).fit() return model.forecast(1).iloc[0] elif self.scorecaster == "ar_quantile_loss_scorecaster": theta = fit_ar_quantile_loss(training_scores, self.hypers["p_order_ar_scorecaster"], self.alpha) return theta @ self.get_covariate(self.hypers["p_order_ar_scorecaster"]) return 0
[docs] def predict(self): """ Makes a prediction by combining the quantile_tracker_predict and scorecaster_predict. Returns: float: The final prediction. """ prediction = sum( fn() for fn, condition in [ (self.quantile_tracker_predict, self.quantile_track), (self.scorecaster_predict, self.scorecaster is not None), ] if condition ) self.past_predictions.append(prediction) return prediction
[docs] def get_covariate(self, p_order=None): """Returns the covariate or feature vector based on past test scores. This is the last p_order scores along with a bias term (1 appended), and could be updated for richer feature vectors. Args: p_order (int, optional): The number of past test scores to include. If None, defaults to `self.hypers["p_order_qt"]`. Returns: np.ndarray: A numpy array of shape `(p_order + 1,)` containing the past test scores (if available) and the bias term. """ if self.quantile_track is not None: bias = self.hypers["bias"] if p_order == None: p_order = self.hypers["p_order_qt"] if len(self.past_test_scores) < p_order: return np.zeros(p_order + 1) return np.concatenate((self.past_test_scores[-p_order:] if p_order > 0 else [], [bias]))
# Updates quantile tracker.
[docs] def step(self, prediction, realized_score): """ Updates the quantile tracker based on the realized score and prediction. The update follows one step of gradient descent with the quantile loss on the quantile tracker's parameter. Parameters: prediction (float): The predicted score. realized_score (float): The actual observed score. """ if self.quantile_track is not None: time = len(self.past_test_scores) + 1 lr_functions = { "fixed": lambda: self.hypers["lr"], "decaying": lambda: self.hypers["lr"] * (time**-0.6), "proportional": lambda: self.hypers["lr"] * ( ( np.array(self.past_test_scores[max(time - 20, 0) :]).max() - np.array(self.past_test_scores[max(time - 20, 0) :]).min() ) if len(self.past_test_scores) > 0 else 1 ), } lr = lr_functions[self.lr_type]() error = realized_score > prediction self.quantile_tracker_parameter += lr * (error - self.alpha) * self.get_covariate() # Update history of scores self.past_test_scores.append(realized_score)
[docs] def eval(self, checkpoint_name=""): """ Evaluates the model and outputs the results in out/expt_name/checkpoint_name. If another ConformalPredictor has already produced evaluation results in this location, the results are combined. If `checkpoint_name` is not provided, then it is set to the current time step. Parameters: checkpoint_name (str): The checkpoint name for this evaluation. """ if checkpoint_name == "": checkpoint_name = str(len(self.past_test_scores)) if self.conformal_predictor_name == "": if self.quantile_track == "scalar": self.conformal_predictor_name = "SQT" elif self.quantile_track == "linear": self.conformal_predictor_name = f'LQT({self.hypers["p_order_qt"]})' if self.scorecaster is not None: self.conformal_predictor_name += f" + {self.scorecaster}" self.conformal_predictor_name += f", {self.lr_type}" eval( self.past_predictions, self.past_test_scores, self.alpha, self.expt_name, checkpoint_name, self.conformal_predictor_name, self.hypers, )
if __name__ == "__main__": data_abbr = "GOOGL" model_type = "theta" data_name = f"{data_abbr}_{model_type}_absolute-residual_scores" scores = get_scores(data_name) split = int(0.33 * len(scores)) val_scores = scores[:split] test_scores = scores[split:] cp = ConformalPredictor( lr_type="decaying", quantile_tracker="linear", expt_name=f"{data_abbr}_{model_type}", ) # Fit the method's hyperparameters (such as the dimension of the quantile tracker) # using a grid search on validation scores. cp.fit(val_scores=list(val_scores)) for t in range(len(test_scores)): prediction = cp.predict() # Make a prediction for the score. cp.step(prediction, test_scores[t]) # Update the quantile tracker parameter. cp.eval() # Provides an evaluation in out/expt_name folder, alongside other ConformalPredictor objects if they exist.