"""This module contains selectors cache object."""
import ray
import logging
import json
import random
from selector.log_setup import TournamentEncoder
[docs]@ray.remote(num_cpus=1)
class TargetAlgorithmObserver:
"""
Stores and manages all data from tournaments.
Parameters
----------
scenario : selector.scenario.Scenario
AC scenario.
"""
def __init__(self, scenario):
self.intermediate_output = {}
self.results = {}
self.start_time = {}
self.tournament_history = {}
self.termination_history = {}
self.tournaments = {}
self.read_from = {"conf id": 1, "instance_id": 1, "index": 1}
self.scenario = scenario
self.core_affinities = {}
for c in range(2 + self.scenario.tournament_size * self.scenario.number_tournaments):
self.core_affinities[c] = None
# todo logging dic should be provided somewhere else -> DOTAC-37
logging.basicConfig(filename=f'{self.scenario.log_location}{self.scenario.log_folder}/Target_Algorithm_Cache.logger', level=logging.INFO,
format='%(asctime)s %(message)s')
[docs] def get_free_core(self):
"""
Looks for and returns the index of a free core.
Returns
-------
int
Index of a free core.
"""
free_cores = [c for c, v in self.core_affinities.items() if v is None]
if not free_cores:
return random.choice(list(self.core_affinities.keys()))
else:
return random.choice(free_cores)
[docs] def record_core_affinity(self, core, task):
"""
Records that a core is bound.
Parameters
----------
core : int
Index of core.
task: dict
Configuration/instance pair.
"""
self.core_affinities[core] = task
[docs] def remove_core_affinity(self, core):
"""
Records that a binding is removed.
Parameters
----------
core : int
Index of core.
"""
self.core_affinities[core] = None
[docs] def put_result(self, conf_id, instance_id, result):
"""
Saves result of the configuration/instance run.
Parameters
----------
conf_id : uuid.UUID
ID of the configuration.
instance_id : str
Name of the instance.
result : float
Result of the configuration/instance run.
"""
logging.info(f"Getting final result: {conf_id}, {instance_id}, {result}")
if conf_id not in self.results:
self.results[conf_id] = {}
if instance_id not in self.results[conf_id]:
self.results[conf_id][instance_id] = result
[docs] def get_results(self):
"""Get all results of tournament.
Returns
-------
dict
Configuration /instance pair results.
"""
logging.info("Publishing results")
return self.results
[docs] def get_results_single(self, conf_id, instance_id):
"""Get a single result of tournament.
Parameters
----------
conf_id : uuid.UUID
ID of the configuration.
instance_id : str
Name of the problem instance.
Returns
-------
dict
Configuration /instance pair results.
"""
result = False
if conf_id in list(self.results.keys()):
if instance_id in list(self.results[conf_id].keys()):
result = self.results[conf_id][instance_id]
return result
[docs] def put_start(self, conf_id, instance_id, start):
"""Record start of target algorithm run.
Parameters
----------
conf_id : uuid.UUID
ID of the configuration.
instance_id : str
Name of the problem instance.
start : int
Start time of the run.
"""
logging.info(f"Getting start: {conf_id}, {instance_id}, {start} ")
if conf_id not in self.start_time:
self.start_time[conf_id] = {}
if instance_id not in self.start_time[conf_id]:
self.start_time[conf_id][instance_id] = start
[docs] def get_start(self):
"""Returns start time.
Returns
-------
int
Start time.
"""
logging.info("Publishing start")
return self.start_time
[docs] def put_tournament_history(self, tournament):
"""
Saves tournament to the tournament history.
Parameters
----------
tournament : selector.pool.Tournament
Tournament data to save to history.
"""
self.tournament_history[tournament.id] = tournament
[docs] def get_tournament_history(self):
"""
Returns tournament history.
Returns
----------
dict
Tournament history: {selector.pool.Tournament.id: selector.pool.Tournament}
"""
return self.tournament_history
[docs] def put_tournament_update(self, tournament):
"""
Saves currently running tournament to the tournament history.
Parameters
----------
tournament : selector.pool.Tournament
Tournament data to save to history.
"""
self.tournaments[tournament.id] = tournament
[docs] def remove_tournament(self, tournament):
"""
Deletes tournament from the tournament history.
Parameters
----------
tournament : selector.pool.Tournament
Tournament data to delete.
"""
self.tournaments.pop(tournament.id)
[docs] def get_tournament(self):
"""
Get all tournaments.
Returns
-------
list
List of selector.pool.Tournament records.
"""
return list(self.tournaments.values())
[docs] def put_termination_history(self, conf_id, instance_id):
"""Save termination history.
Parameters
----------
conf_id : uuid.UUID
ID of the configuration.
instance_id : str
Name of the problem instance.
"""
if conf_id not in self.termination_history:
self.termination_history[conf_id] = []
if instance_id not in self.termination_history[conf_id]:
self.termination_history[conf_id].append(instance_id)
else:
logging.info("This should not happen: we kill something we already killed")
[docs] def get_termination_history(self):
"""
Returns termination history.
Returns
-------
dict
Configuration and instance pairs that were terminated.
"""
return self.termination_history
[docs] def get_termination_single(self, conf_id, instance_id):
"""
Returns termination history for a configuration.
Returns
-------
list
Problem instance names the configuration was terminated on.
"""
termination = False
if conf_id in list(self.termination_history.keys()):
if instance_id in list(self.termination_history[conf_id]):
termination = True
return termination
[docs] def save_rt_results(self):
"""Saves results to file."""
with open(f"./selector/logs/{self.scenario.log_folder}/run_history.json", 'a') as f:
history = {str(k): v for k, v in self.results.items()}
json.dump(history, f, indent=2)
[docs] def save_tournament_history(self):
"""Saves tournament history to file."""
with open(f"./selector/logs/{self.scenario.log_folder}/tournament_history.json", 'a') as f:
history = {str(k): v for k, v in self.tournament_history.items()}
json.dump(history, f, indent=4, cls=TournamentEncoder)