Source code for selector.tournament_monitor

"""In this module the tournament monitor is defined."""
import ray
import logging
import time
import numpy as np


from selector.tournament_performance import get_censored_runtime_for_instance_set,get_conf_time_out, get_runtime_for_instance_set_with_timeout


[docs]@ray.remote(num_cpus=1) class Monitor: """ Monitor whether the live total runtime of a running conf is exceeding the accumulated runtime of the worst finisher, given that we have already enough finisher. While up the monitor may kill multiple conf/instance pairs. To avoid killing a ta run twice, the monitor stores what it has already killed. Parameters ---------- sleep_time : int Wake up and check whether runtime is exceeded cache : selector.ta_result_store.TargetAlgorithmObserver Stores all tournament related data. scenario : selector.scenario.Scenario AC scenario. """ def __init__(self, sleep_time, cache, scenario): self.sleep_time = sleep_time self.cache = cache self.number_of_finisher = scenario.winners_per_tournament self.tournaments = [] self.time_out = scenario.cutoff_time self.par = scenario.par logging.basicConfig(filename=f'{scenario.log_location}{scenario.log_folder}/monitor.log', level=logging.INFO, format='%(asctime)s %(message)s')
[docs] def monitor(self): """ Monitors a tournament and terminates a configuration/instance pair if necessary. """ logging.info("Starting monitor") while True: # Get results that are already available for ta runs start = time.time() results = ray.get(self.cache.get_results.remote()) dur = time.time() - start logging.info(f"Monitor getting results {dur}") # get starting times for each conf/instance start = time.time() start_time = ray.get(self.cache.get_start.remote()) dur = time.time() - start logging.info(f"Monitor getting start {dur}") # Get the current tournaments that are in the cache start = time.time() tournaments = ray.get(self.cache.get_tournament.remote()) dur = time.time() - start logging.info(f"Monitor getting tournaments {dur}") start = time.time() termination_history = ray.get(self.cache.get_termination_history.remote()) dur = time.time() - start logging.info(f"Monitor getting termination_history {dur}") for t in tournaments: # We can only start canceling runs if there are enough winners already if len(t.best_finisher) == self.number_of_finisher: # Compare runtime to the worst best finisher worst_best_finisher = t.best_finisher[-1] runtime_worst_best_finisher = get_runtime_for_instance_set_with_timeout(results, worst_best_finisher.id, t.instance_set, self.time_out, self.par) # We need to compare each configuration that is still in the running to the worst finisher for conf in t.configurations: # Here we figured out which instances the conf is still running and which one it already finished if conf.id in list(results.keys()): instances_conf_finished = list(results[conf.id].keys()) conf_runtime_f = get_runtime_for_instance_set_with_timeout(results, conf.id, t.instance_set, self.time_out, self.par) else: instances_conf_finished = [] conf_runtime_f = 0 instances_conf_planned = list(t.ray_object_store[conf.id].keys()) instances_conf_still_runs = [c for c in instances_conf_planned if c not in instances_conf_finished] # The runtime of a conf is the time it took to finish instances plus the time spend running but # not finishing the running instances # if i in list(start_time[conf.id].keys()): is a bit hack: it might be the case that the main # process things a conf/instance pair is running but the cache has not recived a start time and # thus the conf/instance is in a transition. That conf instance then has not runtime yet # (or at least very very little) so we ignore it for the cancel computation conf_runtime_p = sum([(time.time() - start_time[conf.id][i]) for i in instances_conf_still_runs if i in list(start_time[conf.id].keys())]) conf_runtime = conf_runtime_f + conf_runtime_p #conf_time_out = get_conf_time_out(results, conf.id, t.instance_set) logging.info(f"Monitor kill check,{conf.id} {conf_runtime}, {runtime_worst_best_finisher}" f"{worst_best_finisher.id,}, {[m. id for m in t.configurations]}") if conf_runtime > runtime_worst_best_finisher:# or conf_time_out: # We can only kill still running tasks for i in instances_conf_still_runs: # We check if we have killed the conf/instance pair before. if self.termination_check(conf.id, i, termination_history): logging.info(f"Monitor is killing: {conf} {i} with id: {t.ray_object_store[conf.id][i]}") print(f"Monitor is killing:{time.ctime()} {t.ray_object_store[conf.id][i]}") # In case we kill we store that we have killed self.cache.put_termination_history.remote(conf.id, i) [ray.cancel(t.ray_object_store[conf.id][i])] else: continue time.sleep(self.sleep_time)
[docs] def termination_check(self, conf_id, instance, termination_history): """ Check if we have killed a conf/instance pair already. Return True if we did not. Parameters ---------- conf_id : uuid.UUID ID of the configuration to be checked. instance : str Name of instance to be checked. termination_history : dict Record of configuration/instance terminations. Returns ------- bool False if configuration/instance pair killed, True else. """ if conf_id not in termination_history: return True elif instance not in termination_history[conf_id]: return True else: return False