Source code for selector.winner_instance_monitor

"""In this module the instance monitor is defined."""
import ray
import logging
import time
import numpy as np


[docs]@ray.remote(num_cpus=1) class WinnerInstanceMonitor: """ Monitor whether the runtime of a configuration on an instance exceeds the worst runtime of a tournament winning configuration on that instance multiplied by a constant (delta_cap). When a runtime exceeds this bound the configuration/instance pair is terminated. The terminated configuration/instance pairs are stored in the termination_history to avoid double killing Parameters ---------- sleep_time : int Wake up and check whether runtime is exceeded cache : selector.ta_result_store.TargetAlgorithmObserver Stores all tournament related data. scenario : selector.scenario.Scenario AC scenario. delta_cap : int Constant the current best runtime for each instance is multiplied by. """ def __init__(self, sleep_time, cache, scenario, delta_cap=1): self.sleep_time = sleep_time self.cache = cache self.tournaments = [] self.termination_history = {} self.instance_results = {} self.delta_cap = delta_cap logging.basicConfig(filename=f'{scenario.log_location}{scenario.log_folder}winner_inst_monitor.log', level=logging.INFO, format='%(asctime)s %(message)s')
[docs] def monitor(self): """ Monitors a tournament and terminates a configuration/instance pair if necessary. """ logging.info("Starting monitor") while True: # Get results that are already available for ta runs start = time.time() results = ray.get(self.cache.get_results.remote()) dur = time.time() - start logging.info(f"Monitor getting results {dur}") # get starting times for each conf/instance start = time.time() start_time = ray.get(self.cache.get_start.remote()) dur = time.time() - start logging.info(f"Monitor getting start {dur}") # Get the current tournaments that are in the cache start = time.time() tournaments = ray.get(self.cache.get_tournament.remote()) dur = time.time() - start logging.info(f"Monitor getting tournaments {dur}") # List with the winners of the tournaments currently running winners = [t.best_finisher[0] for t in tournaments if t.best_finisher] logging.info(f"Current tournament winners: {winners}") # We create a dict with the results for the instances for the tournament winners # The worst time by a winner for an instance, that did not time out, gets stored in the dict as a benchmark # for the instance based capping. self.instance_results = {} for conf in winners: for instance in results[conf.id]: if instance in self.instance_results and results[conf.id][instance] > \ self.instance_results[instance] and not np.isnan(results[conf.id][instance]): self.instance_results[instance] = results[conf.id][instance] elif instance not in self.instance_results and not np.isnan(results[conf.id][instance]): self.instance_results[instance] = results[conf.id][instance] logging.info(f"Results of the current tournament winners: {self.instance_results}") for t in tournaments: for conf in t.configurations: instances_conf_finished = [] if conf.id in list(results.keys()): instances_conf_finished = list(results[conf.id].keys()) instances_conf_planned = list(t.ray_object_store[conf.id].keys()) instances_conf_still_runs = [i for i in instances_conf_planned if i not in instances_conf_finished] # We kill a configuration/instance pair, when the runtime exceeds the benchmark runtime # for that instance multiplied by delta_cap for instance in instances_conf_still_runs: if conf.id in start_time and instance in start_time[conf.id] \ and instance in self.instance_results: instance_runtime = time.time() - start_time[conf.id][instance] logging.info( f"Monitor kill check: conf.id: {conf.id}, " f"instance: {instance}, " f"instance_runtime: {instance_runtime}, " f"best_instance_runtime: {self.instance_results[instance]}") if instance_runtime > self.instance_results[instance] * self.delta_cap \ and self.termination_check(conf.id, instance): logging.info( f"Monitor is killing: {conf} {instance} " f"with id: {t.ray_object_store[conf.id][instance]}") print(f"Monitor is killing: {time.ctime()} {t.ray_object_store[conf.id][instance]}") self.update_termination_history(conf.id, instance) logging.info(f"termination_history: {self.termination_history}") [ray.cancel(t.ray_object_store[conf.id][instance])] else: continue time.sleep(self.sleep_time)
[docs] def termination_check(self, conf_id, instance): """ Check if we have killed a conf/instance pair already. Return True if we did not. Parameters ---------- conf_id : uuid.UUID ID of the configuration to be checked. instance : str Name of instance to be checked. Returns ------- bool False if configuration/instance pair killed, True else. """ if conf_id not in self.termination_history: return True elif instance not in self.termination_history[conf_id]: return True else: return False
[docs] def update_termination_history(self, conf_id, instance_id): """ Stores termination events in history. Parameters ---------- conf_id : uuid.UUID ID of the configuration that was killed. instance_id : str Name of instance the configuration was killed on. Returns ------- bool False if configuration/instance pair killed, True else. """ if conf_id not in self.termination_history: self.termination_history[conf_id] = [] if instance_id not in self.termination_history[conf_id]: self.termination_history[conf_id].append(instance_id) else: logging.info( "This should not happen: we kill something we already killed")