Source code for selector.generators.surrogates.cppl_surrogate

"""This module contains functions for the CPPL surrogate."""
import os
import copy
import sys
from scipy.stats import norm
from sklearn.preprocessing import OneHotEncoder
import uuid
import numpy as np
from scipy.linalg import sqrtm
import scipy
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from selector.pool import Configuration, Generator, ParamType
from selector.point_gen import PointGen
from selector.generators.random_point_generator import random_point
from selector.generators.default_point_generator import (
    check_no_goods,
    check_conditionals,
    default_point
)
from selector.generators.random_point_generator import reset_conditionals

np.set_printoptions(threshold=sys.maxsize)
sys.path.append(os.getcwd())

from threadpoolctl import ThreadpoolController
controller = ThreadpoolController()


[docs]class CPPL: """ Surrogate from CPPL. Note ---- Implementation based on the paper: "Pool-based realtime algorithm configuration: A preselection bandit approach" Parameters ---------- s : selector.scenario.Scenario AC scenario. seed : int Random seed. features : ndarray Problem instance features. pool_size : int Number of Configurations to keep in CPPL pool. alpha : int CPPL hyperparameter alpha, see paper for details. gamma : float CPPL hyperparameter gamma, see paper for details. w : float CPPL hyperparameter w, see paper for details. random_prob : float Probability for a random Configuration being generated. mutation_prob : float Probability for mutation in crossover mechanism. pca_dimension_configurations : int PCA dimension for configuration values. pca_dimension_instances : int PCA dimension for instance feature values. model_update : str Update mode for model ["SGD", "Batch"]. v_hat_norm : str Norm to use on v_hat [None, "max", "zero_one"], see paper for details. theta_norm : str Norm to use on theta [None, "max", "zero_one"], see paper for details. feature_normaliser : str Which normalization to use on instance feature values ["max", "zero_one"]. ensemble: bool True, if accounting for same configuration with different IDs needs to be made. """ def __init__(self, scenario, seed, features, pool_size=192, alpha=1, gamma=0.1, w=0.1, random_prob=0.2, mutation_prob=0.8, pca_dimension_configurations=8, pca_dimension_instances=8, model_update="Batch", v_hat_norm=None, theta_norm="zero_one", feature_normaliser="max", ensemble=True): self.scenario = scenario self.features = features self.seed = seed self.ensemble = ensemble self.v_hat_norm = v_hat_norm self.theta_norm = theta_norm self.feature_normaliser = feature_normaliser self.pool_size = pool_size self.pool = [random_point(scenario, uuid.uuid4()) for _ in range(self.pool_size - 1)] + \ [default_point(scenario, uuid.uuid4())] self.feature_store = {} self.model_update = model_update self.alpha = alpha self.gamma = gamma self.w = w self.t = 0 self.random_prob = random_prob self.mutation_prob = mutation_prob self.number_new_confs = 3 self.pca_dimension_configurations = pca_dimension_configurations if len(scenario.parameter) < self.pca_dimension_configurations: self.pca_dimension_configurations = len(scenario.parameter) self.pca_dimension_instances = pca_dimension_instances if len(list(self.features.values())[0]) < self.pca_dimension_instances: self.pca_dimension_instances = len(list(self.features.values())[0]) elif len(self.features) < self.pca_dimension_instances: self.pca_dimension_instances = len(self.features) self.pca_number_confs_calibration = 100 self.context_dim = self.pca_dimension_configurations * 2 + \ self.pca_dimension_instances * 2 + \ self.pca_dimension_configurations * self.pca_dimension_instances self.theta_hat = np.random.random_sample(self.context_dim) self.theta_bar = copy.copy(self.theta_hat) self.gradient_sum = {0: np.zeros((self.context_dim, self.context_dim))} self.hessian_sum = {0: np.zeros((self.context_dim, self.context_dim))} self.process_parameter() instance_feature_matrix = np.array(list(self.features.values())) self.instance_feature_standard_scaler = StandardScaler() transformed_features = \ self.instance_feature_standard_scaler.\ fit_transform(instance_feature_matrix) for instance, counter in zip(self.features.keys(), range(len(self.features.keys()))): self.features[instance] = transformed_features[counter] self.calibrate_pca() self.best_q = None self.identity_store = {} self.confidences = {}
[docs] def process_parameter(self): """ Figure out the parameter types and get values/calibrate encoding and scaling """ cat_params = [] cont_int_params = [] # figure out the param type for param in self.scenario.parameter: if param.type == ParamType.categorical: cat_params.append(param) else: cont_int_params.append(param) # treat cont and int params self.lower_b_params = np.zeros(len(cont_int_params)) self.upper_b_params = np.zeros(len(cont_int_params)) for i in range(len(cont_int_params)): bound = cont_int_params[i].bound self.lower_b_params[i] = float(bound[0]) self.upper_b_params[i] = float(bound[-1]) # init one hot encoding for cat params: list_of_bounds = [] list_o_of_default = [] # With longest this is some super wired hack I have to do for the OneHotEncoder # If I do not find tha parameter with the longest string to parse in as default # the categories will become dtype Ux with x being the lenght of the longest string in the default # if then there are values that are longer then UX which get parsed in later these will be cut to the length.. for param in cat_params: if isinstance(param.default, (bool, np.bool_)): list_of_bounds.append(list(map(str, list(map(int, param.bound))))) list_o_of_default.append(str(int(param.default))) else: longest = 0 list_of_bounds.append(param.bound) for bound in param.bound: if len(bound) > longest: longest = len(bound) cp = bound list_o_of_default.append(cp) self.cat_params_names = [p.name for p in cat_params] self.cont_int_params_names = [p.name for p in cont_int_params] if len(self.cat_params_names) > 0: self.o_h_enc = OneHotEncoder(categories=list_of_bounds) self.o_h_enc.fit(np.array(list_o_of_default).reshape(1, -1) )
[docs] def scale_conf(self, configuration): """ Scale and encode a configuration. Continuous/integer parameters are scaled between 0 and 1. Categorical parameters are one-hot encoded. Parameters ---------- configuration : selector.pool.Configuration Configuration. Returns ------- ndarray Scaled and encoded configuration values. """ cat_params_on_conf = np.zeros(len(self.cat_params_names), dtype=object) cont_int_params_of_conf = np.zeros(len(self.cont_int_params_names), dtype=float) for param, value in configuration.conf.items(): if param in self.cat_params_names: if isinstance(value, (bool, np.bool_)): cat_params_on_conf[self.cat_params_names.index(param)] = str(int(value)) else: cat_params_on_conf[self.cat_params_names.index(param)] = value else: cont_int_params_of_conf[self.cont_int_params_names .index(param)] = value cont_int_scaled = (cont_int_params_of_conf - self.lower_b_params) / (self.upper_b_params - self.lower_b_params).reshape(1, -1) if len(self.cat_params_names) > 0: cat_params_on_conf = self.o_h_enc.transform(cat_params_on_conf.reshape(1, -1)).toarray() return np.concatenate((cont_int_scaled, cat_params_on_conf), axis=None)
[docs] def calibrate_pca(self): """ Calibarte the PCA and Scalers for the featuremap """ random_generator = PointGen(self.scenario, random_point) if len(self.cat_params_names) > 0: para_vector_size = len([item for sublist in self.o_h_enc.categories_ for item in sublist]) + len(self.cont_int_params_names) else: para_vector_size = len(self.cont_int_params_names) conf_matrix = np.zeros((self.pca_number_confs_calibration, para_vector_size)) for i in range(self.pca_number_confs_calibration): point = random_generator.point_generator() conf_matrix[i] = self.scale_conf(point) self.pca_configurations = PCA(n_components=self.pca_dimension_configurations) pca_conf = self.pca_configurations.fit_transform(conf_matrix) self.pca_instances = PCA(n_components=self.pca_dimension_instances) pca_features = self.pca_instances.fit_transform(np.array(list(self.features.values()))) sample_size = min(pca_conf.shape[0], pca_features.shape[0]) pca_conf = pca_conf[:sample_size] pca_features = pca_features[:sample_size] feature_map_matrix = np.zeros((sample_size * sample_size, self.context_dim)) fc = 0 for i in range(sample_size): for j in range(sample_size): feature_map_matrix[fc] = np.concatenate((pca_conf[i], pca_features[j], pca_conf[i] ** 2, pca_features[j] ** 2, (pca_conf[i] * pca_features[j].reshape(-1,1)).flatten())).flatten() fc = fc + 1 self.feature_map_scaler = StandardScaler() self.feature_map_scaler.fit(feature_map_matrix)
[docs] def compute_feature_map(self, conf, instance_features): """ For a configuration/instance pair, compute the quadratic feature map and scale. Parameters ---------- conf : selector.pool.Configuration Configuration. instance_features : ndarray Numpy array of instance features. Returns ------- ndarray Scaled quadratic problem instance features. """ conf_values = self.scale_conf(conf) conf_values = self.pca_configurations.transform(conf_values.reshape(1, -1)) instance_features = self.pca_instances.transform(instance_features.reshape(1, -1)) features = np.concatenate((conf_values, instance_features, conf_values**2, instance_features ** 2, (conf_values * instance_features.reshape(-1,1)).flatten().reshape(1,-1)),axis=1).flatten() # features = features /max(features) if self.feature_normaliser == "max": features = features / max(features) elif self.feature_normaliser == "zero_one": features = (features - min(features)) / (max(features) - min(features )) return features
[docs] def to_log_space(self): """Not implemented.""" pass
[docs] def from_log_space(self): """Not implemented.""" pass
[docs] def update_feature_store(self, conf, instance): """ For a configuration/instance pair, compute the features and store them in a feature store for later use. Parameters ---------- conf : selector.pool.Configuration Configuration. instance : str Instance name. """ if conf.id not in self.feature_store: self.feature_store[conf.id] = {} if instance not in self.feature_store[conf.id]: self.feature_store[conf.id][instance] = self.compute_feature_map(conf, self.features[instance])
[docs] def compute_a(self, theta, tried_conf_ids, instance_id): """ Compute numerator for gradient and hessian computation. Parameters ---------- theta : ndarray Parameter vector theta_bar, based on winner feedback. tried_conf_ids : list of uuid.UUID IDs of configurations that participated in the tournament. instance_id : str Name of instance the feedback was generated on. Returns ------- ndarray Numerator for further compuations. """ sum = 0 for conf in tried_conf_ids: sum = sum + self.feature_store[conf][instance_id] * \ np.exp(np.dot(self.feature_store[conf][instance_id], theta)) return sum
[docs] def compute_b(self, theta, tried_conf_ids, instance_id): """ Compute denominator for gradient and hessian computation. Parameters ---------- theta : ndarray Parameter vector theta_bar, based on winner feedback. tried_conf_ids : list of uuid.UUID IDs of configurations that participated in the tournament. instance_id : str Name of instance the feedback was generated on. Returns ------- ndarray Denominator for further compuations. """ sum = 0 for conf in tried_conf_ids: sum = sum + np.exp( np.dot(self.feature_store[conf][instance_id], theta)) return sum
[docs] def compute_c(self, theta, tried_conf_ids, instance_id): """ Compute second numerator for hessian computation. Parameters ---------- theta : ndarray Parameter vector theta_bar, based on winner feedback. tried_conf_ids : list of uuid.UUID IDs of configurations that participated in the tournament. instance_id : str Name of instance the feedback was generated on. Returns ------- ndarray Second numerator for further compuations. """ sum = 0 for conf in tried_conf_ids: sum = sum + np.exp(np.dot(self.feature_store[conf][instance_id], theta)) * \ np.outer(self.feature_store[conf][instance_id], self.feature_store[conf][instance_id]) return sum
[docs] def compute_gradient(self, theta, winner_id, tried_confs, instance_id): """ Compute the gradient for a given feedback. Parameters ---------- theta : ndarray Parameter vector theta_bar, based on winner feedback. winner_id: uuid.UUID ID of the configuration winning the tournament. tried_conf_ids : list of uuid.UUID IDs of configurations that participated in the tournament. instance_id : str Name of instance the feedback was generated on. Returns ------- ndarray Gradient for further compuations. """ a = self.compute_a(theta, tried_confs, instance_id) b = self.compute_b(theta, tried_confs, instance_id) return self.feature_store[winner_id][instance_id] - (a / b)
[docs] def compute_hessian(self, theta, winner_id, tried_confs, instance_id): """ Compute the hessian for a given feedback. Parameters ---------- theta : ndarray Parameter vector theta_bar, based on winner feedback. winner_id: uuid.UUID ID of the configuration winning the tournament. tried_conf_ids : list of uuid.UUID IDs of configurations that participated in the tournament. instance_id : str Name of instance the feedback was generated on. Returns ------- ndarray Hessian for further compuations. """ a = self.compute_a(theta, tried_confs, instance_id) b = self.compute_b(theta, tried_confs, instance_id) c = self.compute_c(theta, tried_confs, instance_id) return (np.outer(a, a) / b ** 2) - (c / b)
[docs] def update_running_sums(self, theta, winner_id, tried_confs, instance_id): """ Update the rolling sums of gradients and hessian for a feedback. Parameters ---------- theta : ndarray Parameter vector theta_bar, based on winner feedback. winner_id: uuid.UUID ID of the configuration winning the tournament. tried_conf_ids : list of uuid.UUID IDs of configurations that participated in the tournament. instance_id : str Name of instance the feedback was generated on. """ if self.t not in self.gradient_sum.keys(): gradient = self.compute_gradient(theta, winner_id, tried_confs, instance_id) self.gradient_sum[self.t] = self.gradient_sum[self.t - 1] + np.outer(gradient, gradient) if self.t not in self.hessian_sum.keys(): hessian = self.compute_hessian(theta, winner_id, tried_confs, instance_id) self.hessian_sum[self.t] = self.hessian_sum[self.t - 1] + hessian ** 2
[docs] def compute_confidence(self, theta, instance_id, conf): """ Compute the confidence for an instance/conf combination. Parameters ---------- theta : ndarray Parameter vector theta_bar, based on winner feedback. instance_id : str Name of an instance. conf : uuid.UUID ID of a configuration. Returns ------- float Confidence. """ # Control How many threads/cores numpy and scipy use @controller.wrap(limits=self.scenario.tournament_size, user_api='openmp') @controller.wrap(limits=self.scenario.tournament_size, user_api='blas') def compute_confidence_threaded(theta, instance_id, conf): v = (1 / self.t) * self.gradient_sum[self.t] s = (1 / self.t) * self.hessian_sum[self.t] try: s_inv = np.linalg.inv(s) except: s_inv = np.linalg.pinv(s) sigma = (1 / self.t) * np.dot(np.dot(s_inv, v), s_inv) M = np.exp(2 * np.dot(self.feature_store[conf][instance_id], theta)) * \ np.outer(self.feature_store[conf][instance_id], self.feature_store[conf][instance_id]) sigma_root = sqrtm(sigma) I_hat = np.linalg.norm(np.dot(np.dot(sigma_root, M), sigma_root)) return self.w * np.sqrt((2 * np.log(self.t) + self.context_dim + 2 * np.sqrt((self.context_dim * np.log(self.t)))) * I_hat) return compute_confidence_threaded(theta, instance_id, conf)
[docs] def update_model_single_observation(self, winner_id, tried_confs, instance_id): """ Update the thetas of the model for a single instance feedback. Parameters ---------- winner_id: uuid.UUID ID of the configuration winning the tournament. tried_conf_ids : list of uuid.UUID IDs of configurations that participated in the tournament. instance_id : str Name of instance the feedback was generated on. """ grad = self.compute_gradient(self.theta_hat, winner_id, tried_confs, instance_id) self.theta_hat = self.theta_hat + self.gamma * self.t ** (-self.alpha) * grad if self.theta_norm == "max": self.theta_hat = self.theta_hat / max(self.theta_hat) elif self.theta_norm == "zero_one": self.theta_hat = (self.theta_hat - min(self.theta_hat)) / (max(self.theta_hat ) - min(self.theta_hat )) #self.theta_hat = self.theta_hat/max(self.theta_hat) self.theta_bar = ((self.t - 1) * (self.theta_bar)) / self.t + (self.theta_hat / self.t)
[docs] def update_model_mini_batch(self, winner_ids, tried_confs, instance_ids): """ Update the thetas of the model for feedback over multiple instances. Parameters ---------- winner_id: uuid.UUID ID of the configuration winning the tournament. tried_conf_ids : list of uuid.UUID IDs of configurations that participated in the tournament. instance_id : str Name of instance the feedback was generated on. """ grad_mean = np.zeros(self.context_dim) for uc in range(len(winner_ids)): grad = self.compute_gradient(self.theta_hat, winner_ids[uc], tried_confs[uc], instance_ids[uc]) grad_mean = grad_mean + grad grad_mean = grad_mean / len(winner_ids) self.theta_hat = self.theta_hat + self.gamma * self.t ** (-self.alpha) * grad_mean self.theta_hat = self.theta_hat / max(self.theta_hat) self.theta_bar = ((self.t - 1) * (self.theta_bar)) / self.t + (self.theta_hat / self.t)
[docs] def select_from_set(self, conf_set, instance_set, n_to_select): """ For a set of configurations and instances select the most promising configurations Parameters ---------- conf_set : list of selector.pool.Configuration Set of configurations to select from. instance_set : list of str Instance set the next tournament will be run on. n_to_select : int Number of configurations to select from the set. Returns ------- tuple - list of selector.pool.Configuration, Selected configurations. - list of list of float, Utility and confidence pairs according to selected configurations. """ v_hat = np.zeros((len(conf_set), len(instance_set))) confidence = np.zeros((len(conf_set), len(instance_set))) for i, conf in enumerate(conf_set): for next_instance, _ in enumerate(instance_set): v_hat[i][next_instance] = np.exp(np.inner(self.feature_store[conf.id][instance_set[next_instance]], self.theta_bar)) if self.t > 0: confidence[i][next_instance] = self.compute_confidence(self.theta_bar, instance_set[next_instance], conf.id) v_hat_s = v_hat.sum(axis=1) if self.v_hat_norm == "max": v_hat_s = v_hat_s / max(v_hat_s) elif self.v_hat_norm == "zero_one": v_hat_s = (v_hat_s - min(v_hat_s)) / (max(v_hat_s) - min(v_hat_s)) if self.t > 0: confidence_s = confidence.sum(axis=1) if self.v_hat_norm == "max": confidence_s = confidence_s / max(confidence_s) elif self.v_hat_norm == "zero_one": confidence_s = (confidence_s - min(confidence_s)) / (max(confidence_s) - min(confidence_s)) else: confidence_s = np.zeros(v_hat_s.shape) quality = v_hat_s + confidence_s qual_sort = (-quality).argsort() selection = qual_sort[:n_to_select] return [conf_set[i] for i in selection], [[v_hat_s[i], confidence_s[i]] for i in qual_sort]
[docs] def delete_from_pool(self, instance_set): """ Based on the feedback delete poorly performing configurations from the pool. Parameters ---------- instance_set : list of str Instance set the feedback was generated on. """ v_hat = np.zeros((self.pool_size, len(instance_set))) confidence = np.zeros((self.pool_size, len(instance_set))) for i, conf in enumerate(self.pool): for next_instance, _ in enumerate(instance_set): v_hat[i][next_instance] = np.exp(np.inner(self.feature_store[conf.id][instance_set[next_instance]], self.theta_bar)) if self.t > 0: confidence[i][next_instance] = self.compute_confidence(self.theta_bar, instance_set[next_instance], conf.id) v_hat_s = v_hat.sum(axis=1) if self.v_hat_norm == "max": v_hat_s = v_hat_s / max(v_hat_s) elif self.v_hat_norm == "zero_one": v_hat_s = (v_hat_s - min(v_hat_s)) / (max(v_hat_s) - min(v_hat_s)) if self.t > 0: confidence_s = confidence.sum(axis=1) if self.v_hat_norm == "max": confidence_s = confidence_s / max(confidence_s) elif self.v_hat_norm == "zero_one": confidence_s = (confidence_s - min(confidence_s)) / (max(confidence_s) - min(confidence_s)) else: confidence_s = np.zeros(v_hat_s.shape) discard_index = [] for c in range(self.pool_size): for oc in range(self.pool_size): if c != oc and v_hat_s[oc] - confidence_s[oc] > v_hat_s[c] + confidence_s[c]: discard_index.append(c) break dis = [] for i in sorted(discard_index, reverse=True): dis.append(self.pool[i]) del self.pool[i]
[docs] def create_new_conf(self, parent_one, parent_two): """ Create new configurations based on a genetic procedure described. Parameters ---------- parent_one : selector.pool.Configuration First configuration for the crossover. parent_two : selector.pool.Configuration Second configuration for the crossover. Returns ------- selector.pool.Configuration Resulting configuration. """ no_good = True while no_good: new_conf = {} rn = np.random.uniform() if rn < self.random_prob: new_conf = random_point(self.scenario, uuid.uuid4()) new_conf.generator = Generator.cppl else: for param, setting in parent_one.conf.items(): rn = np.random.uniform() if rn < 0.5: new_conf[param] = setting else: new_conf[param] = parent_two.conf[param] rn = np.random.uniform() if rn < self.mutation_prob: possible_mutations = random_point(self.scenario, uuid.uuid4()) param_to_mutate = np.random.choice(list(new_conf.keys())) new_conf[param_to_mutate] = possible_mutations.conf[param_to_mutate] identity = uuid.uuid4() new_conf = Configuration(identity, new_conf, Generator.cppl) cond_vio = check_conditionals(self.scenario, new_conf.conf) if cond_vio: new_conf.conf = reset_conditionals(self.scenario, new_conf.conf, cond_vio) no_good = check_no_goods(self.scenario, new_conf.conf) return new_conf
[docs] def add_to_pool(self, past_instances): """ Add the most promising newly created configurations to the pool Parameters ---------- past_instances : list of str Instance set of the prior tournament. """ number_to_create = self.pool_size - len(self.pool) new_promising_conf = [] if number_to_create > 0: if len(self.pool) > 1: best_to_confs, _ = self.select_from_set(self.pool, past_instances, 2) conf_one, conf_two = best_to_confs[0], best_to_confs[1] else: conf_one = self.pool[0] conf_two = random_point(self.scenario, uuid.uuid4()) conf_two.generator = Generator.cppl possible_new_confs = [] for nc in range(self.number_new_confs * number_to_create): possible_new_confs = possible_new_confs + [self.create_new_conf(conf_one, conf_two)] for instance in past_instances: for c in possible_new_confs: self.update_feature_store(c, instance) # TODO I have to ensure that all the confs here are diffrent... new_promising_conf, _ = self.select_from_set(possible_new_confs, past_instances, number_to_create) self.pool = self.pool + new_promising_conf
[docs] def update(self, previous_tournament, c, results, terminations, instance_features=None, ac_runtime=None): """ Update the model with given feedback. Parameters ---------- results : dict of dict Feedback for the configuration-instance pairs in the previous tournament. previous_tournament : selector.pool.Tournament Tournament. c : list of selector.pool.Configuration Configurations participating in the previous tournament. results : dict Performances of the configuration on the instance set of the tournament. terminations : dict Configurations that were terminated on instances. instance_features : ndarray Problem instance features. ac_runtime : int The total runtime of Selector in seconds so far. """ if instance_features: instance_feature_matrix = np.array(list(instance_features.values())) transformed_features = self.instance_feature_standard_scaler.transform(instance_feature_matrix) for instance, counter in zip(instance_features.keys(), range(len(instance_features.keys()))): self.features[instance] = transformed_features[counter] best_conf_store = [] rest_conf_store = [] instance_store = [] confs_w_feedback = previous_tournament.best_finisher + previous_tournament.worst_finisher for instance in previous_tournament.instance_set: results_on_instance = {} for c in self.pool + confs_w_feedback: cond_vio = check_conditionals(self.scenario, c.conf) if cond_vio: c.conf = reset_conditionals(self.scenario, c.conf, cond_vio) self.update_feature_store(c, instance) for c in previous_tournament.configuration_ids: if not np.isnan(results[c][instance]): results_on_instance[c] = results[c][instance] else: # OMIT the capped data in model update if c in terminations: if instance in terminations[c]: continue else: # This conf/inst pair was a time limit reach results_on_instance[c] = self.scenario.cutoff_time if self.best_q is None: if len(results_on_instance.values()) == 0: import sys self.best_q = sys.maxsize else: self.best_q = min(results_on_instance.values()) elif len(results_on_instance.values()) == 0: if min(results_on_instance.values()) < self.best_q: self.best_q = min(results_on_instance.values()) best_conf_on_instance = min(results_on_instance, key=results_on_instance.get) if not self.ensemble: if results_on_instance[best_conf_on_instance] >= self.scenario.cutoff_time: self.pool = [random_point(self.scenario, uuid.uuid4()) for _ in range(self.pool_size -1 )] + [default_point(self.scenario, uuid.uuid4())] for c in self.pool: [self.update_feature_store(c, i) for i in previous_tournament.instance_set] continue tried = previous_tournament.configuration_ids.copy() # Through suggesting a conf multiple times with diffrent id we may get feedback for the same conf # under a diffrent id. Here we map that back in order to update for the same conf. if self.ensemble: if best_conf_on_instance in self.identity_store: best_conf_on_instance = self.identity_store[best_conf_on_instance] for c in range(len(tried)): if tried[c] in self.identity_store: tried[c] = self.identity_store[tried[c]] instance_store.append(instance) best_conf_store.append(best_conf_on_instance) rest_conf_store.append(tried) self.t = self.t + 1 # this is actually the same as running "batch" with a best_conf_store of len() 1 if self.model_update == "SGD": self.update_model_single_observation(best_conf_on_instance, tried, instance) self.update_running_sums(self.theta_bar, best_conf_on_instance, tried, instance) if self.model_update == "Batch" and len(best_conf_store) > 0: self.update_model_mini_batch(best_conf_store, rest_conf_store, instance_store) self.delete_from_pool(previous_tournament.instance_set) self.add_to_pool(previous_tournament.instance_set)
[docs] def get_suggestions(self, scenario, n_to_select, d, r, next_instance_set, instance_features=None): """ Suggest configurations to run next based on the next instance set to run on. Parameters ---------- scenario : selector.scenario.Scenario AC scenario. n_to_select : int Number of configurations to return. d : list of selector.pool.Tournament Tournament history. r : dict Performances of the configuration on the instance set of the tournament. next_instance_set : list of str Instance set to run on in the next tournament. instance_features : ndarray Problem instance features. Returns ------- tuple - list of selector.pool.Configuration, Suggested configurations. - list of list of float, Utility and confidence pairs according to selected configurations. """ if instance_features: instance_feature_matrix = np.array(list(instance_features.values())) transformed_features = self.instance_feature_standard_scaler.transform(instance_feature_matrix) for instance, counter in zip(instance_features.keys(), range(len(instance_features.keys()))): self.features[instance] = transformed_features[counter] for instance in next_instance_set: for c in self.pool: self.update_feature_store(c, instance) suggest, ranking = self.select_from_set(self.pool, next_instance_set, n_to_select) suggest = copy.deepcopy(suggest) for sugg in suggest: sugg.generator = Generator.cppl # We have to make sure that the ids of the configurations returend are unique. # I.e for two different tournaments we may suggest the same conf twice. # In that case the conf needs a unique id. # Later update() we need to map that back if self.ensemble: identity = uuid.uuid4() self.identity_store[identity] = sugg.id sugg.id = identity return suggest, ranking
[docs] def suggest_from_outside_pool(self, conf_set, n_to_select, next_instance_set, instance_features=None): """ Suggest configurations to run next that are not in the CPPL pool. Parameters ---------- conf_set : list of selector.pool.Configuration n_to_select : int Number of configurations to return. next_instance_set : list of str Instance set to run on in the next tournament. instance_features : ndarray Problem instance features. Returns ------- tuple - **suggest**: list of selector.pool.Configuration, Suggested configurations. - **ranking**: list of list of float, Utility and confidence pairs according to selected configurations. """ if instance_features: instance_feature_matrix = np.array(list(instance_features.values())) transformed_features = self.instance_feature_standard_scaler.transform(instance_feature_matrix) for instance, counter in zip(instance_features.keys(), range(len(instance_features.keys()))): self.features[instance] = transformed_features[counter] for instance in next_instance_set: for c in conf_set: self.update_feature_store(c, instance) suggest, ranking = self.select_from_set(conf_set, next_instance_set, n_to_select) return suggest, ranking
[docs] def predict(self, suggestions, next_instance_set): """ Predict performance/quality of configurations with CPPL. Parameters ---------- suggestions : list of selector.pool.Configuration Suggested configurations. next_instance_set : List of next instances to run the tournament on. Returns ------- tuple - **v**: ndarray, Mean of predicted performance/quality. - **c**: ndarray, Variance of predicted performance/quality. """ ranking = self.suggest_from_outside_pool(suggestions, len(suggestions), next_instance_set)[1] v = [] c = [] for i, _ in enumerate(ranking): v.append(ranking[i][0]) c.append(ranking[i][1]) v = np.array(v) c = np.array(c) self.v = v self.c = c return v, c
[docs] def probability_improvement(self, suggestions, results, next_instance_set): """ Compute probability of improvement. Parameters ---------- suggestions : list of selector.pool.Configuration Suggested configurations. next_instance_set : List of next instances to be run. results : dict Performances of the configuration on the instance set of the tournament. next_instance_set : List of next instances to run the tournament on. Returns ------- ndarray **pi_output**: Probabilities of improvement. """ v = self.v c = self.c std = np.sqrt(c) pi = norm.cdf((self.best_q - v) / std) pi_output = [] for p in pi: pi_output.append([p]) return pi_output
[docs] def expected_improvement(self, suggestions, next_instance_set): """ Compute expected improvement via CPPL model. Parameters ---------- suggestions : list of selector.pool.Configuration Suggested configurations. next_instance_set : list of str List of next instances to be run. Returns ------- ndarray **ei**: Expected improvements. """ mean = self.v var = self.c std = np.sqrt(var) def calculate_ei(): z = (self.best_q - mean) / std return (self.best_q - mean) * norm.cdf(z) + std * norm.pdf(z) if np.any(std == 0.0): stdc = np.copy(std) std[stdc == 0.0] = 1.0 ei = calculate_ei() ei[stdc == 0.0] = 0.0 return ei else: return calculate_ei()