Source code for selector.selection_features

"""This module contains feature generation functions."""
import numpy as np
import copy
import time
from collections import defaultdict
from selector.pool import Generator, Surrogates
import selector.hp_point_selection as hps


[docs]class FeatureGenerator: """ Generate features necessary to evaluate configurations. Parameters ---------- logger : logging.Logger Logging object. """ def __init__(self, logger=None): self.Generator = Generator self.logger = logger
[docs] def percent_rel_evals(self, suggestions, data, nce): """Percentage of relatives so far evaluated. Parameters ---------- suggestions : list Suggested points. data : object Contains historic performance data. nce : int Number of configuration evaluations. Returns ------- div_feats : list Computed features of suggested points. """ self.gen_counts = defaultdict(int) for content in data.values(): for finisher in content.best_finisher + content.worst_finisher: self.gen_counts[finisher.generator] += 1 div_feats = [[self.gen_counts[sugg.generator]] for sugg in suggestions] max_val = max(max(d) for d in div_feats) if max_val != 0.0: for d in div_feats: d[0] = d[0] / max_val return div_feats
[docs] def avg_rel_evals_qual(self, suggestions, data, nce, results, cot, generators): """Average quality of relatives so far evaluated. Parameters ---------- suggestions : list Suggested points. data : object Contains historic performance data. nce : int Number of all configs evaluated. results : dict Qualities of configurations. cot : float Cut off time for tournaments (i.e. time limit). generators : list All possible generators. Returns ------- div_feats : list Computed features of suggested points. """ div_feats = [] quals = {} gen_counts = self.gen_counts quals = defaultdict(int) for content in data.values(): for finisher in content.best_finisher + content.worst_finisher: quals[finisher.generator] += \ sum(results[finisher.id].values()) / \ len(results[finisher.id]) for gen in quals.keys(): quals[gen] /= gen_counts.get(gen, 1) # Avoid division by zero div_feats = [[quals.get(sugg.generator, 0) / cot] for sugg in suggestions] max_val = max(max(d) for d in div_feats) if max_val != 0.0: for d in div_feats: d[0] /= max_val return div_feats
[docs] def best_rel_evals_qual(self, suggestions, data, generators, results, cot): """Best target value relatives so far evaluated. Parameters ---------- suggestions : list Suggested points. data : object Contains historic performance data. generators : list All possible generators. results : dict Qualities of configurations. cot : float Cut off time for tournaments (i.e. time limit). Returns ------- div_feats : list Computed features of suggested points. """ div_feats = [] best_val = {} best_val = defaultdict(int) for content in data.values(): for finisher in content.best_finisher + content.worst_finisher: for val in results[finisher.id].values(): if finisher.generator not in best_val: best_val[finisher.generator] = val elif val < best_val[finisher.generator]: best_val[finisher.generator] = val div_feats = [[best_val.get(sugg.generator, 0) / cot] for sugg in suggestions] max_val = max(max(d) for d in div_feats) if max_val != 0.0: for d in div_feats: d[0] = d[0] / max_val return div_feats
[docs] def std_rel_evals_qual(self, suggestions, data, generators, results, cot): """Std of quality of relatives so far evaluated. Parameters ---------- suggestions : list Suggested points. data : object Contains historic performance data. generators : list All possible generators. results : dict Qualities of configurations. cot : float Cut off time for tournaments (i.e. time limit). Returns ------- div_feats : list Computed features of suggested points. """ div_feats = [] qual_vals = {} qual_vals = defaultdict(int) for content in data.values(): for finisher in content.best_finisher + content.worst_finisher: if finisher.generator in qual_vals: for res_val in list(results[finisher.id].values()): qual_vals[finisher.generator].append(res_val) else: qual_vals[finisher.generator] = \ list(results[finisher.id].values()) qual_std = {} for key, qv in qual_vals.items(): qual_std[key] = np.std(qv) div_feats = [[qual_std.get(sugg.generator, 0) / cot] for sugg in suggestions] max_val = max(max(d) for d in div_feats) if max_val != 0.0: for d in div_feats: d[0] = d[0] / max_val return div_feats
[docs] def diff_pred_real_qual(self, suggestions, data, predicted_quals, results): """Difference of predicted & real qual. of relatives evaluated so far. Parameters ---------- suggestions : list Suggested points. data : object Contains historic performance data. predicted_quals : list of lists Predicted performance/quality for suggested configurations. results : dict Qualities of configurations. Returns ------- div_feats : list Computed features of suggested points. """ if not predicted_quals: div_feats = [[0] for _ in suggestions] else: rel_results = {} rel_predicts = {} div_feats = [] diffs = {} # Collect relevant predictions into a dictionary for pred in predicted_quals: pred = list(pred.values())[0] gen = pred['gen'] rel_predicts.setdefault(gen, []).append(pred['qual']) # Collect relevant results into a dictionary for content in data.values(): for finisher in content.best_finisher + content.worst_finisher: gen = finisher.generator rel_results.setdefault(gen, []).extend(results[finisher.id].values()) for gen in Generator: if gen in rel_results and gen in rel_predicts: if len(rel_predicts[gen]) > 0 and \ len(rel_results[gen]) > 0: diffs[gen] = \ np.mean(rel_predicts[gen]) \ / np.mean(rel_results[gen]) elif gen not in diffs: diffs[gen] = 0 for sugg in suggestions: div_feats.append([diffs[sugg.generator]]) max_val = max(max(d) for d in div_feats) if max_val != 0.0: for d in div_feats: d[0] = d[0] / max_val return div_feats
[docs] def avg_dist_evals(self, suggests, evals, psetting): """Average distance to all points so far evaluated. Parameters ---------- suggestions : list Suggested points. evals : list Already evaluated points. psetting : object Scenario parameters. Returns ------- div_feats : list Average distances to all already evaluated points. """ if evals: suggestions = copy.deepcopy(suggests) evaluated = copy.deepcopy(evals) suggestions = hps.normalize_plus_cond_acc(suggestions, psetting) evaluated = hps.normalize_plus_cond_acc(evaluated, psetting) distances = hps.pairwise_distances(suggestions, evaluated) div_feats = [] for dist in distances: div_feats.append([np.mean(dist)]) max_val = max(max(d) for d in div_feats) if max_val != 0.0: for d in div_feats: d[0] = d[0] / max_val else: div_feats = [[0] for _ in suggests] return div_feats
[docs] def avg_dist_sel(self, suggests, psetting): """Average distance to points in the current selection. Parameters ---------- suggestions : list Suggested points. psetting : object Scenario parameters. Returns ------- div_feats : list Average distances to points in the current selection. """ suggestions = copy.deepcopy(suggests) suggestions = hps.normalize_plus_cond_acc(suggestions, psetting) distances = hps.pairwise_distances(suggestions, suggestions) div_feats = [] for dist in distances: div_feats.append([np.mean(dist)]) max_val = max(max(d) for d in div_feats) if max_val != 0.0: for d in div_feats: d[0] = d[0] / max_val return div_feats
[docs] def avg_dist_rel(self, suggests, evals, psetting, generators): """Average distances to relatives. Parameters ---------- suggests : list Suggested points. evals : list Already evaluated points. psetting : object Scenario parameters. generators : list Available generators. Returns ------- div_feats : list Computed features of suggested points. """ if evals: suggestions = copy.deepcopy(suggests) evaluated = copy.deepcopy(evals) suggestions = hps.normalize_plus_cond_acc(suggestions, psetting) evaluated = hps.normalize_plus_cond_acc(evaluated, psetting) group_relatives = {} for gen in generators: for ev in evaluated: if gen == ev.generator: if gen not in group_relatives: group_relatives[gen] = [ev] else: group_relatives[gen].append(ev) distances = [] for sugg in suggestions: if sugg.generator in group_relatives: distances.append(hps.pairwise_distances([sugg], group_relatives[sugg.generator])) else: distances.append([0 for _ in sugg.conf]) div_feats = [] for dist in distances: div_feats.append([np.mean(dist)]) max_val = float(max(max(d) for d in div_feats)) for d in div_feats: if max_val != 0.0: d[0] = d[0] / max_val else: div_feats = [[0] for _ in suggests] return div_feats
[docs] def expected_qual(self, suggs, sm, cot, surr, next_instance_set): """Expected quality of points. Parameters ---------- suggests : list Suggested points. sm : object Surrogates.SurrogateManager(). cot : int Cut off time (i.e. time limit). surr : str Which surrogate to use. next_instance_set : list Next instances that will be run. Returns ------- dyn_feats : list Computed features of suggested points. """ suggests = copy.deepcopy(suggs) dyn_feats = [] try: expimp = sm.predict(surr, suggests, cot, next_instance_set) self.expimp = expimp for exim in expimp: for ei in exim.values(): dyn_feats.append([ei['qual']]) max_val = float(max(max(d) for d in dyn_feats)) for d in dyn_feats: if max_val != 0.0: d[0] = d[0] / max_val except: dyn_feats = [[0] for _ in suggests] return dyn_feats
[docs] def prob_qual_improve(self, suggs, sm, cot, results, surr, next_instance_set): """Probability of quality of points to improve. Parameters ---------- suggests : list Suggested points. sm : object Surrogates.SurrogateManager(). cot : int Cut off time (i.e. time limit). results : list Results of points evaluated so far. surr : str Which surrogate to use. Returns ------- dyn_feats : list Computed features of suggested points. """ suggests = copy.deepcopy(suggs) dyn_feats = [] try: expimp = sm.pi(surr, suggests, cot, results, next_instance_set) for ei in expimp: dyn_feats.append(list(ei)) max_val = float(max(max(d) for d in dyn_feats)) for d in dyn_feats: if max_val != 0.0: d[0] = d[0] / max_val except: dyn_feats = [[0] for _ in suggests] return dyn_feats
[docs] def uncertainty_improve(self, suggs, sm, cot, surr, next_instance_set): """Probability of quality of points to improve. Parameters ---------- suggests : list Suggested points. sm : object Surrogates.SurrogateManager(). cot : int Cut off time (i.e. time limit). surr : str Which surrogate to use. Returns ------- dyn_feats : list Computed features of suggested points. """ suggests = copy.deepcopy(suggs) dyn_feats = [] try: expimp = self.expimp for exim in expimp: for ei in exim.values(): dyn_feats.append([ei['var']]) max_val = float(max(max(d) for d in dyn_feats)) for d in dyn_feats: if max_val != 0.0: d[0] = d[0] / max_val except: dyn_feats = [[0] for _ in suggests] return dyn_feats
[docs] def expected_improve(self, suggs, sm, cot, surr, next_instance_set): """Probability of quality of points to improve. Parameters ---------- suggests : list Suggested points. sm : object Surrogates.SurrogateManager(). cot : int Cut off time (i.e. time limit). surr : str Which surrogate to use. Returns ------- dyn_feats : list Computed features of suggested points. """ suggests = copy.deepcopy(suggs) dyn_feats = [] try: expimp = sm.ei(surr, suggests, next_instance_set) for ei in expimp: dyn_feats.append([ei]) max_val = float(max(max(d) for d in dyn_feats)) for d in dyn_feats: if max_val != 0.0: d[0] = d[0] / max_val except: dyn_feats = [[0] for _ in suggests] return dyn_feats
[docs] def surr_votes(self, dyn_feats): """Multiply surr features to get agreement features. Parameters ---------- dyn_feats : list of np.ndarray Dynamic features. Returns ------- dyn_feats : list of np.ndarray Extended dynamic features. """ nr_surrs = len(Surrogates) new_feature_sets = [] for k in range(0, len(dyn_feats[0]), nr_surrs): votes = [[] for _ in range(nr_surrs)] for i, _ in enumerate(dyn_feats): for j in range(nr_surrs): votes[j].append([dyn_feats[i, k] * dyn_feats[i, k - 1]]) new_feature_sets.append(votes) for nfs in new_feature_sets: for v in nfs: dyn_feats = np.concatenate((dyn_feats, v), axis=1) return dyn_feats
[docs] def static_feature_gen(self, suggestions, epoch, max_epoch): """Generate static features. Parameters ---------- suggestions : list Suggested configurations. epoch : int Current epoch. max_epoch : int Total number of epochs. Returns ------- static_features : list Static features. """ if self.logger is not None: static_time = time.time() static_feats = [[] for ii in range(len(suggestions))] # One-Hot encoded information of generator used for conf for s in range(len(suggestions)): for gt in range(len(self.Generator)): if suggestions[s].generator == self.Generator(gt + 1): static_feats[s].append(1.0) else: static_feats[s].append(0.0) # Ratio of current epoch and max. epochs for sf in range(len(static_feats)): static_feats[sf].append(epoch / max_epoch) if self.logger is not None: self.logger.info(f"Static features took {time.time() - static_time}\n\n") return np.array(static_feats)
[docs] def dynamic_feature_gen(self, suggestions, data, predicted_quals, sm, cot, results, next_instance_set): """Generate dynamic features. Parameters ---------- suggestions : list Suggested configurations. data : object Contains historic data. predicted_quals : list of lists Predicted performance/quality for suggested configurations. sm : object Surrogates.SurrogateManager(). cot : int Cut off time (i.e. time limit). results : list Results of points evaluated so far. Returns ------- dyn_feats : list Dynamic features. """ if self.logger is not None: all_dyn_time = time.time() dyn_one = time.time() # Features based on surrogates dyn_feats = self.expected_qual(suggestions, sm, cot, Surrogates.SMAC, next_instance_set) if self.logger is not None: self.logger.info(f"Dyn 1 features took {time.time() - dyn_one}\n\n") dyn_two = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.expected_qual(suggestions, sm, cot, Surrogates.GGApp, None)), axis=1) if self.logger is not None: self.logger.info(f"Dyn 2 features took {time.time() - dyn_two}\n\n") dyn_three = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.expected_qual(suggestions, sm, cot, Surrogates.CPPL, next_instance_set)), axis=1) if self.logger is not None: self.logger.info(f"Dyn 3 features took {time.time() - dyn_three}\n\n") dyn_four = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.prob_qual_improve(suggestions, sm, cot, results, Surrogates.SMAC, None)), axis=1) if self.logger is not None: self.logger.info(f"Dyn 4 features took {time.time() - dyn_four}\n\n") dyn_five = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.prob_qual_improve(suggestions, sm, cot, results, Surrogates.GGApp, None)), axis=1) if self.logger is not None: self.logger.info(f"Dyn 5 features took {time.time() - dyn_five}\n\n") dyn_six = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.prob_qual_improve(suggestions, sm, cot, results, Surrogates.CPPL, next_instance_set)), axis=1) if self.logger is not None: self.logger.info(f"Dyn 6 features took {time.time() - dyn_six}\n\n") dyn_seven = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.uncertainty_improve(suggestions, sm, cot, Surrogates.SMAC, None)), axis=1) if self.logger is not None: self.logger.info(f"Dyn 7 features took {time.time() - dyn_seven}\n\n") dyn_eight = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.uncertainty_improve(suggestions, sm, cot, Surrogates.GGApp, None)), axis=1) if self.logger is not None: self.logger.info(f"Dyn 8 features took {time.time() - dyn_eight}\n\n") dyn_nine = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.uncertainty_improve(suggestions, sm, cot, Surrogates.CPPL, next_instance_set)), axis=1) if self.logger is not None: self.logger.info(f"Dyn 9 features took {time.time() - dyn_nine}\n\n") dyn_ten = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.expected_improve(suggestions, sm, cot, Surrogates.SMAC, next_instance_set)), axis=1) if self.logger is not None: self.logger.info(f"Dyn 10 features took {time.time() - dyn_ten}\n\n") dyn_eleven = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.expected_improve(suggestions, sm, cot, Surrogates.GGApp, None)), axis=1) if self.logger is not None: self.logger.info(f"Dyn 11 features took {time.time() - dyn_eleven}\n\n") dyn_twelve = time.time() dyn_feats = \ np.concatenate((dyn_feats, self.expected_improve(suggestions, sm, cot, Surrogates.CPPL, next_instance_set)), axis=1) dyn_feats = self.surr_votes(dyn_feats) if self.logger is not None: self.logger.info(f"Dyn 12 features took {time.time() - dyn_twelve}\n\n") self.logger.info(f"All dyn features took {time.time() - all_dyn_time}\n\n") return np.array(dyn_feats)
[docs] def diversity_feature_gen(self, suggestions, data, results, cot, psetting, predicted_quals, evaluated): """Generate diversity features. Parameters ---------- suggestions : list Suggested configurations. data : object Contains historic data. results : dict Qualities of configurations. cot : float Cut off time for tournaments. psetting : object Scenario parameters. predicted_quals : list Predicted qualities of points evaluated so far. evaluated : list All evaluated points so far. sm : object Initialized Surrogates.SurrogateManager(). Returns ------- div_feats : list Diversity features. """ if self.logger is not None: div_time = time.time() nce = 0 for content in data.values(): nce += len(content.configuration_ids) generators = [gen for gen in Generator] # Features based on relatives evaluated so far div_feats = self.percent_rel_evals(suggestions, data, nce) div_feats = \ np.concatenate((div_feats, self.avg_rel_evals_qual(suggestions, data, nce, results, cot, generators)), axis=1) div_feats = \ np.concatenate((div_feats, self.best_rel_evals_qual(suggestions, data, generators, results, cot)), axis=1) div_feats = \ np.concatenate((div_feats, self.std_rel_evals_qual(suggestions, data, generators, results, cot)), axis=1) div_feats = \ np.concatenate((div_feats, self.diff_pred_real_qual(suggestions, data, predicted_quals, results)), axis=1) # Features based on points evaluated so far div_feats = \ np.concatenate((div_feats, self.avg_dist_evals(suggestions, evaluated, psetting)), axis=1) div_feats = \ np.concatenate((div_feats, self.avg_dist_sel(suggestions, psetting)), axis=1) div_feats = \ np.concatenate((div_feats, self.avg_dist_rel(suggestions, evaluated, psetting, generators)), axis=1) if self.logger is not None: self.logger.info(f"Div features took {time.time() - div_time}\n\n") return np.array(div_feats)