"""This modules includes functions to execute the target algorithm."""
import logging
import time
import subprocess
import ray
import numpy as np
import psutil
import os
import signal
import traceback
import re
import copy
from threading import Thread
from queue import Queue, Empty
from selector.generators.default_point_generator import check_conditionals
__all__ = ['kill_process_tree', 'cpu_bind_children', 'enqueue_output',
'get_running_processes', 'tae_from_cmd_wrapper_quality',
'tae_from_cmd_wrapper_rt', 'termination_check', 'time_measurment']
[docs]def kill_process_tree(pid):
"""
Propagates through the process tree and terminates/kills children.
Parameters
----------
pid : int
Process ID of the target algorithm.
"""
try:
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children:
child.terminate()
parent.terminate()
_, still_alive = psutil.wait_procs(children, timeout=10)
for p in still_alive:
p.kill()
parent.wait(10)
except psutil.NoSuchProcess:
pass
[docs]def cpu_bind_children(chosen_core, p, set_affinity, logging):
"""
Ensures target algorithm and child processes only use the chosen core.
Parameters
----------
chosen_core : int
Core the process and children ought to stay on.
p : object
Target algorithm process.
set_affinity : list
List that tracks CPU affinity of child processes.
logging : object
Initialized logging object.
"""
try:
if p.poll() is None:
children = p.children(recursive=True)
if children:
for child in children:
if (child.is_running() and child.pid
not in set_affinity
and child.cpu_affinity()[0] != chosen_core):
try:
child.cpu_affinity([chosen_core])
set_affinity.append(child.pid)
logging.info(
f"""New child process {child.pid}
{child.name()} CPU affinity set to core:
{chosen_core}""")
except (psutil.NoSuchProcess, psutil.AccessDenied):
logging.info(
f"""Failed to set CPU affinity for new
child process {child.pid} {child.name()}.""")
except:
pass
[docs]def time_measurment(p, start, cpu_time_p):
"""
Measures target algorithm runtime: 1. process tree or 2. process or 3. wall
Parameters
----------
p : object
Target algorithm process.
start : float
Start time in seconds.
cpu_time_p : float
Last runtime measurement.
"""
cpu_times = p.cpu_times()
if cpu_times.children_user != 0 and cpu_times.children_user > cpu_time_p:
cpu_time_p = cpu_times.children_user
elif cpu_times.user != 0 and cpu_times.user > cpu_time_p:
cpu_time_p = cpu_times.user
else:
cpu_time_p = time.time() - start
return cpu_time_p
[docs]def enqueue_output(out, queue):
"""
Enqueue output.
Parameters
----------
out : str
Target algorithm output.
queue : multiprocessing.Queue
Queue to get data.
"""
for line in iter(out.readline, b''):
line = line.decode("utf-8")
queue.put(line)
out.close()
[docs]def get_running_processes(ta_process_name):
"""
Get list of running processes.
Parameters
----------
ta_process_name : str
Name of process to find all processes with.
"""
processes = []
for proc in psutil.process_iter():
try:
processName = proc.name()
processID = proc.pid
if processName in [ta_process_name]:
processes.append([processName, processID])
except (psutil.NoSuchProcess, psutil.AccessDenied,
psutil.ZombieProcess):
pass
return processes
[docs]def termination_check(process_pid, process_status, ta_process_name, python_pid,
conf_id, instance):
"""
Check if process was terminated.
Parameters
----------
process_pid : int
PID of the process to check.
process_status : str
Status of the process.
ta_process_name : str
Name of the ta process as noted in system.
python_pid : int
PID of the ray actor.
conf_id : uuid.UUID
ID of the configuration.
instance : str
Problem instance name.
"""
running_processes = get_running_processes(ta_process_name)
sr = False
for rp in running_processes:
if process_pid == rp[1]:
sr = True
if sr:
logging.info(f"""Failed to terminate {conf_id}, {instance}: process
{process_pid} with {process_status} on {python_pid} is still
running""")
else:
logging.info(
f"""Successfully terminated {conf_id}, {instance} on {python_pid}
with {process_status}""")
[docs]@ray.remote(num_cpus=1)
def tae_from_cmd_wrapper_rt(conf, instance_path, cache, ta_command_creator,
scenario):
"""
Execute the target algorithm with a given conf/instance pair by calling a user-provided Wrapper
that creates a command line argument that can be executed.
Warning
-------
If your target algorithms spawn child processes, you might set scenario.cpu_binding = True.
Parameters
----------
conf : selector.pool.Configuration
Configuration.
instance : str
instance name.
cache : selector.tournament_dispatcher.MiniTournamentDispatcher
Cache for all tournament data.
ta_command_creator : wrapper
Wrapper that creates a command line.
scenario : selector.scenario.Scenario
AC scenario.
Returns
-------
tuple
- **conf** : object,
Configuration.
- **instance_path** : str,
Path to the instance.
- **terminated** : bool,
Whether the process was terminated.
"""
# todo logging dic should be provided somewhere else -> DOTAC-37
logging.basicConfig(
filename=f'{scenario.log_location}{scenario.log_folder}/wrapper_log_for{conf.id}.log',
level=logging.INFO, format='%(asctime)s %(message)s')
try:
logging.info("\n")
logging.info(f"Wrapper TAE start {conf}, {instance_path}")
runargs = {'instance': f'{scenario.instances_dir + instance_path}',
'seed': scenario.seed if scenario.seed else -1,
"id": f"{conf.id}"}
clean_conf = copy.copy(conf.conf)
# Check conditionals and turn off parameters if violated
cond_vio = check_conditionals(scenario, clean_conf)
for cv in cond_vio:
clean_conf.pop(cv, None)
cmd = ta_command_creator.get_command_line_args(runargs, clean_conf)
start = time.time()
cache.put_start.remote(conf.id, instance_path, start)
p = psutil.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, close_fds=True)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True
t.start()
if scenario.cpu_binding:
set_affinity = []
chosen_core = ray.get(cache.get_free_core.remote())
cache.record_core_affinity.remote(chosen_core,
[conf.id, instance_path])
logging.info(f'Binding TA to core: {chosen_core}')
ta_execution_process = psutil.Process()
ta_execution_process.cpu_affinity([chosen_core])
p.cpu_affinity([chosen_core])
cpu_bind_children(chosen_core, p, set_affinity, logging)
timeout = False
empty_line = False
memory_p = 0
cpu_time_p = 0
reading = True
solved = False
while reading:
try:
if scenario.cpu_binding:
cpu_bind_children(chosen_core, p, set_affinity, logging)
line = q.get(timeout=.5)
empty_line = False
except Empty:
empty_line = True
else: # write intemediate feedback
if "placeholder" in line:
cache.put_intermediate_output.remote(
conf.id, instance_path, line)
logging.info(f"""Wrapper TAE intermediate feedback {conf},
{instance_path} {line}""")
if scenario.solve_match:
if any(sm in line for sm in scenario.solve_match):
if scenario.runtime_feedback:
time_res = float(
re.findall(
f"{scenario.runtime_feedback}", line)[0])
solved = True
if p.poll() is None:
# Get the cpu time and memory of the process
cpu_time_p = time_measurment(p, start, cpu_time_p)
memory_p = p.memory_info().rss / 1024 ** 2
if (float(cpu_time_p) > float(scenario.cutoff_time)
or float(memory_p) > float(
scenario.memory_limit)
and timeout is False) or (
scenario.solve_match and solved):
timeout = float(cpu_time_p) > float(scenario.cutoff_time)
logging.info(f"""Timeout or memory reached, terminating:
{conf}, {instance_path} {cpu_time_p}""")
kill_process_tree(p.pid)
if p.poll() is None:
p.terminate()
try:
time.sleep(1)
except:
print("Got sleep interupt", conf, instance_path)
pass
if p.poll() is None:
p.kill()
try:
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
except Exception:
pass
# Break the while loop when the ta was killed or finished
if empty_line and p.poll() is not None:
reading = False
if scenario.runtime_feedback and solved:
cpu_time_p = time_res
if timeout:
cache.put_result.remote(conf.id, instance_path, np.nan)
elif scenario.solve_match:
if solved:
cache.put_result.remote(conf.id, instance_path, cpu_time_p)
else:
cache.put_result.remote(conf.id, instance_path, np.nan)
else:
cache.put_result.remote(conf.id, instance_path, cpu_time_p)
if scenario.cpu_binding:
cache.remove_core_affinity.remote(chosen_core)
time.sleep(0.2)
logging.info(
f"Wrapper TAE end {conf}, {instance_path} at {cpu_time_p}s")
return conf, instance_path, False
except KeyboardInterrupt:
logging.info(f" Killing: {conf}, {instance_path} ")
# We only terminated the subprocess in case it has started (p is defined)
if 'p' in vars():
kill_process_tree(p.pid)
if p.poll() is None:
p.terminate()
try:
time.sleep(1)
except:
print("Got sleep interupt", conf, instance_path)
pass
if p.poll() is None:
p.kill()
try:
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
except Exception as e:
pass
# if scenario.ta_pid_name is not None:
# termination_check(p.pid, p.poll(), scenario.ta_pid_name, os.getpid(), conf.id, instance_path)
if scenario.cpu_binding:
cache.put_result.remote(conf.id, instance_path, np.nan)
cache.remove_core_affinity.remote(chosen_core)
try:
logging.info(
f"Killing status: {p.poll()} {conf.id} {instance_path}")
except:
pass
return conf, instance_path, True
except Exception:
if scenario.cpu_binding:
try:
cache.remove_core_affinity.remote(chosen_core)
except:
pass
print({traceback.format_exc()})
logging.info(f"Exception in TA execution: {traceback.format_exc()}")
[docs]@ray.remote(num_cpus=1)
def tae_from_cmd_wrapper_quality(conf, instance_path, cache,
ta_command_creator, scenario):
"""
Execute the target algorithm with a given conf/instance pair by calling a user-provided Wrapper
that creates a command line argument that can be executed.
Warning
-------
If your target algorithms spawn child processes, you might set scenario.cpu_binding = True.
Parameters
----------
conf : selector.pool.Configuration
Configuration.
instance : str
instance name.
cache : selector.tournament_dispatcher.MiniTournamentDispatcher
Cache for all tournament data.
ta_command_creator : wrapper
Wrapper that creates a command line.
scenario : selector.scenario.Scenario
AC scenario.
Returns
-------
tuple
- **conf** : object,
Configuration.
- **instance_path** : str,
Path to the instance.
- **terminated** : bool,
Whether the process was terminated.
"""
logging.basicConfig(filename=f'''{scenario.log_location}{scenario.log_folder}/wrapper_log_for{conf.id}.log''',
level=logging.INFO,
format='%(asctime)s %(message)s')
try:
logging.info("\n")
logging.info(f"Wrapper TAE start {conf}, {instance_path}")
runargs = {'instance': f'{scenario.instances_dir + instance_path}',
'seed': scenario.seed if scenario.seed else -1,
"id": f"{conf.id}", "timeout": scenario.cutoff_time}
clean_conf = copy.copy(conf.conf)
# Check conditionals and turn off parameters if violated
cond_vio = check_conditionals(scenario, clean_conf)
for cv in cond_vio:
clean_conf.pop(cv, None)
cmd = ta_command_creator.get_command_line_args(runargs, conf.conf)
start = time.time()
cache.put_start.remote(conf.id, instance_path, start)
p = psutil.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, close_fds=True)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True
t.start()
if scenario.cpu_binding:
set_affinity = []
chosen_core = ray.get(cache.get_free_core.remote())
cache.record_core_affinity.remote(
chosen_core, [conf.id, instance_path])
logging.info(f'Binding TA to core: {chosen_core}')
ta_execution_process = psutil.Process()
ta_execution_process.cpu_affinity([chosen_core])
p.cpu_affinity([chosen_core])
cpu_bind_children(chosen_core, p, set_affinity, logging)
timeout = False
empty_line = False
memory_p = 0
cpu_time_p = 0
reading = True
quality = [np.nan]
while reading:
try:
if scenario.cpu_binding:
cpu_bind_children(chosen_core, p, set_affinity, logging)
line = q.get(timeout=.5)
empty_line = False
# Get the cpu time and memory of the process
except Empty:
empty_line = True
pass
else: # write intemediate feedback
if "placeholder" in line:
cache.put_intermediate_output.remote(
conf.id, instance_path, line)
logging.info(f"""Wrapper TAE intermediate feedback {conf},
{instance_path} {line}""")
if scenario.run_obj == "quality":
output_trigger = re.search(scenario.quality_match, line)
if output_trigger:
quality = re.findall(
f"{scenario.quality_extract}", line)
if p.poll() is None:
# Get the cpu time and memory of the process
cpu_time_p = time_measurment(p, start, cpu_time_p)
memory_p = p.memory_info().rss / 1024 ** 2
if (float(cpu_time_p) > float(scenario.cutoff_time)
or float(memory_p) > float(
scenario.memory_limit)
and timeout is False) or quality != [np.nan]:
timeout = True
logging.info(f"""Timeout or memory reached, terminating:
{conf}, {instance_path} {cpu_time_p}""")
kill_process_tree(p.pid)
if p.poll() is None:
p.terminate()
try:
time.sleep(1)
except:
print("Got sleep interupt", conf, instance_path)
pass
if p.poll() is None:
p.kill()
try:
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
except Exception:
pass
# Break the while loop when the ta was killed or finished
if empty_line and p.poll() is not None:
reading = False
if scenario.cpu_binding:
cache.put_result.remote(conf.id, instance_path, float(quality[0]))
cache.remove_core_affinity.remote(chosen_core)
time.sleep(0.2)
logging.info(f"Wrapper TAE end {conf}, {instance_path}")
return conf, instance_path, False
except KeyboardInterrupt:
logging.info(f" Killing: {conf}, {instance_path} ")
# We only terminated the subprocess in case it has started (p is defined)
if 'p' in vars():
kill_process_tree(p.pid)
if p.poll() is None:
p.terminate()
try:
time.sleep(1)
except:
print("Got sleep interupt", conf, instance_path)
pass
if p.poll() is None:
p.kill()
try:
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
except Exception as e:
pass
# if scenario.ta_pid_name is not None:
# termination_check(p.pid, p.poll(), scenario.ta_pid_name, os.getpid(), conf.id, instance_path)
if scenario.cpu_binding:
cache.put_result.remote(conf.id, instance_path, np.nan)
cache.remove_core_affinity.remote(chosen_core)
try:
logging.info(
f"Killing status: {p.poll()} {conf.id} {instance_path}")
except:
pass
return conf, instance_path, True
except Exception:
if scenario.cpu_binding:
try:
cache.remove_core_affinity.remote(chosen_core)
except:
pass
print({traceback.format_exc()})
logging.info(f"Exception in TA execution: {traceback.format_exc()}")
@ray.remote(num_cpus=1)
def dummy_task(conf, instance_path, cache):
time.sleep(2)
cache.put_result.remote(conf.id, instance_path, np.nan)
return conf, instance_path, True
@ray.remote(num_cpus=1)
def tae_from_aclib(conf, instance, cache, ta_exc):
pass
# TODO