From a9901070655229e9c4a4e37997efc3fe575054ed Mon Sep 17 00:00:00 2001 From: Benedikt Volkel Date: Thu, 26 Oct 2023 12:31:52 +0200 Subject: [PATCH] [WFRunner] Estimate resources dynamically * introduce functionality to update resource estimates based on values which are collected during monitoring (the same that go into pipeline_metric_.log) * disentangle resource management from WorkflowExecutor * manage resources via ResourceManager object * memory and cpu * semaphores * number of parallel tasks to be launched * niceness * main interfaces of ResourceManager * ok_to_submit * book * unbook * add_monitored_resources * group corresponding tasks together to be able to update resource estimates for upcoming tasks * TaskResources as container of a task's resource estimate holding * resource estimates (initial and new estimates) * references to Semaphore object (optional * reference to list of related tasks (optional) * make sure, final niceness assigned to a launched process is the same that the TaskResources hold for that process * enable functionality with --dynamic-resources * add more comments to classes and functions * update --cgroup option to require entire path --> handle both cgroups v1 and cgroups v2 (cherry picked from commit ef0e84d4d13fcafb86b96895ca370894025619fa) --- MC/bin/o2_dpg_workflow_runner.py | 772 ++++++++++++++++++++++--------- 1 file changed, 541 insertions(+), 231 deletions(-) diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index d6d4bd4ee..0163b9f6a 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -4,7 +4,6 @@ import re import subprocess -import shlex import time import json import logging @@ -33,7 +32,7 @@ from o2dpg_workflow_utils import read_workflow # defining command line options -parser = argparse.ArgumentParser(description='Parallel execution of a (O2-DPG) DAG data/job pipeline under resource contraints.', +parser = argparse.ArgumentParser(description='Parallel execution of a (O2-DPG) DAG data/job pipeline under resource contraints.', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('-f','--workflowfile', help='Input workflow file name', required=True) @@ -47,18 +46,26 @@ parser.add_argument('--produce-script', help='Produces a shell script that runs the workflow in serialized manner and quits.') parser.add_argument('--rerun-from', help='Reruns the workflow starting from given task (or pattern). All dependent jobs will be rerun.') parser.add_argument('--list-tasks', help='Simply list all tasks by name and quit.', action='store_true') -parser.add_argument('--update-resources', dest="update_resources", help='Read resource estimates from a JSON and apply where possible.') -parser.add_argument('--mem-limit', help='Set memory limit as scheduling constraint (in MB)', default=0.9*max_system_mem/1024./1024) -parser.add_argument('--cpu-limit', help='Set CPU limit (core count)', default=8) +# Resources +parser.add_argument('--update-resources', dest="update_resources", help='Read resource estimates from a JSON and apply where possible.') +parser.add_argument("--dynamic-resources", dest="dynamic_resources", action="store_true", help="Update reources estimates of task based on finished related tasks") # derive resources dynamically +parser.add_argument('--optimistic-resources', dest="optimistic_resources", action="store_true", help="Try to run workflow even though resource limits might underestimate resource needs of some tasks") +parser.add_argument("--n-backfill", dest="n_backfill", type=int, default=1) +parser.add_argument('--mem-limit', help='Set memory limit as scheduling constraint (in MB)', default=0.9*max_system_mem/1024./1024, type=float) +parser.add_argument('--cpu-limit', help='Set CPU limit (core count)', default=8, type=float) parser.add_argument('--cgroup', help='Execute pipeline under a given cgroup (e.g., 8coregrid) emulating resource constraints. This m\ ust exist and the tasks file must be writable to with the current user.') + +# run control, webhooks parser.add_argument('--stdout-on-failure', action='store_true', help='Print log files of failing tasks to stdout,') parser.add_argument('--webhook', help=argparse.SUPPRESS) # log some infos to this webhook channel parser.add_argument('--checkpoint-on-failure', help=argparse.SUPPRESS) # debug option making a debug-tarball and sending to specified address # argument is alien-path parser.add_argument('--retry-on-failure', help=argparse.SUPPRESS, default=0) # number of times a failing task is retried parser.add_argument('--no-rootinit-speedup', help=argparse.SUPPRESS, action='store_true') # disable init of ROOT environment vars to speedup init/startup + +# Logging parser.add_argument('--action-logfile', help='Logfilename for action logs. If none given, pipeline_action_#PID.log will be used') parser.add_argument('--metric-logfile', help='Logfilename for metric logs. If none given, pipeline_metric_#PID.log will be used') parser.add_argument('--production-mode', action='store_true', help='Production mode') @@ -68,7 +75,7 @@ def setup_logger(name, log_file, level=logging.INFO): """To setup as many loggers as you want""" - handler = logging.FileHandler(log_file, mode='w') + handler = logging.FileHandler(log_file, mode='w') handler.setFormatter(formatter) logger = logging.getLogger(name) @@ -130,10 +137,10 @@ def getChildProcs(basepid): proc=psutil.Process(int(p)) except psutil.NoSuchProcess: continue - + plist.append(proc) return plist - + # # Code section to find all topological orderings # of a DAG. This is used to know when we can schedule @@ -142,23 +149,23 @@ def getChildProcs(basepid): # class to represent a graph object class Graph: - + # Constructor def __init__(self, edges, N): - + # A List of Lists to represent an adjacency list self.adjList = [[] for _ in range(N)] - + # stores in-degree of a vertex # initialize in-degree of each vertex by 0 self.indegree = [0] * N - + # add edges to the undirected graph for (src, dest) in edges: - + # add an edge from source to destination self.adjList[src].append(dest) - + # increment in-degree of destination vertex by 1 self.indegree[dest] = self.indegree[dest] + 1 @@ -166,47 +173,47 @@ def __init__(self, edges, N): def findAllTopologicalOrders(graph, path, discovered, N, allpaths, maxnumber=1): if len(allpaths) >= maxnumber: return - + # do for every vertex for v in range(N): - + # proceed only if in-degree of current node is 0 and # current node is not processed yet if graph.indegree[v] == 0 and not discovered[v]: - + # for every adjacent vertex u of v, reduce in-degree of u by 1 for u in graph.adjList[v]: graph.indegree[u] = graph.indegree[u] - 1 - + # include current node in the path and mark it as discovered path.append(v) discovered[v] = True - + # recur findAllTopologicalOrders(graph, path, discovered, N, allpaths) - + # backtrack: reset in-degree information for the current node for u in graph.adjList[v]: graph.indegree[u] = graph.indegree[u] + 1 - + # backtrack: remove current node from the path and # mark it as undiscovered path.pop() discovered[v] = False - + # record valid ordering if len(path) == N: allpaths.append(path.copy()) - - + + # get all topological orderings of a given DAG as a list def printAllTopologicalOrders(graph, maxnumber=1): # get number of nodes in the graph N = len(graph.adjList) - + # create an auxiliary space to keep track of whether vertex is discovered discovered = [False] * N - + # list to store the topological order path = [] allpaths = [] @@ -218,21 +225,23 @@ def printAllTopologicalOrders(graph, maxnumber=1): # find all tasks that depend on a given task (id); when a cache # dict is given we can fill for the whole graph in one pass... -def find_all_dependent_tasks(possiblenexttask, tid, cache={}): - c=cache.get(tid) +def find_all_dependent_tasks(possiblenexttask, tid, cache=None): + c=cache.get(tid) if cache else None if c!=None: return c - + daughterlist=[tid] # possibly recurse for n in possiblenexttask[tid]: - c = cache.get(n) + c = cache.get(n) if cache else None if c == None: c = find_all_dependent_tasks(possiblenexttask, n, cache) daughterlist = daughterlist + c - cache[n]=c + if cache is not None: + cache[n]=c - cache[tid]=daughterlist + if cache is not None: + cache[tid]=daughterlist return list(set(daughterlist)) @@ -250,12 +259,12 @@ def analyseGraph(edges, nodes): nextjobtrivial[e[0]].append(e[1]) if nextjobtrivial[-1].count(e[1]): nextjobtrivial[-1].remove(e[1]) - + # find topological orderings of the graph # create a graph from edges graph = Graph(edges, N) orderings = printAllTopologicalOrders(graph) - + return (orderings, nextjobtrivial) @@ -282,7 +291,7 @@ def draw_workflow(workflowspec): dot.edge(str(fromindex), str(toindex)) dot.render('workflow.gv') - + # builds the graph given a "taskuniverse" list # builds accompagnying structures tasktoid and idtotask def build_graph(taskuniverse, workflowspec): @@ -295,9 +304,9 @@ def build_graph(taskuniverse, workflowspec): nodes.append(tasktoid[t[0]['name']]) for n in t[0]['needs']: edges.append((tasktoid[n], tasktoid[t[0]['name']])) - + return (edges, nodes) - + # loads json into dict, e.g. for workflow specification def load_json(workflowfile): @@ -333,7 +342,7 @@ def task_matches_labels(t): if targetlabels.count(l)!=0: return True return False - + # The following sequence of operations works and is somewhat structured. # However, it builds lookups used elsewhere as well, so some CPU might be saved by reusing # some structures across functions or by doing less passes on the data. @@ -341,7 +350,7 @@ def task_matches_labels(t): # helper lookup tasknametoid = { t['name']:i for i, t in enumerate(workflowspec['stages'],0) } - # check if a task can be run at all + # check if a task can be run at all # or not due to missing requirements def canBeDone(t,cache={}): ok = True @@ -393,17 +402,17 @@ def needed_by_targets(name): return transformedworkflowspec -# builds topological orderings (for each timeframe) +# builds topological orderings (for each timeframe) def build_dag_properties(workflowspec): globaltaskuniverse = [ (l, i) for i, l in enumerate(workflowspec['stages'], 1) ] timeframeset = set( l['timeframe'] for l in workflowspec['stages'] ) edges, nodes = build_graph(globaltaskuniverse, workflowspec) tup = analyseGraph(edges, nodes.copy()) - # + # global_next_tasks = tup[1] - + dependency_cache = {} # weight influences scheduling order can be anything user defined ... for the moment we just prefer to stay within a timeframe # then take the number of tasks that depend on a task as further weight @@ -411,7 +420,7 @@ def build_dag_properties(workflowspec): # TODO: make this a policy of the runner to study different strategies def getweight(tid): return (globaltaskuniverse[tid][0]['timeframe'], len(find_all_dependent_tasks(global_next_tasks, tid, dependency_cache))) - + task_weights = [ getweight(tid) for tid in range(len(globaltaskuniverse)) ] for tid in range(len(globaltaskuniverse)): @@ -420,6 +429,7 @@ def getweight(tid): # print (global_next_tasks) return { 'nexttasks' : global_next_tasks, 'weights' : task_weights, 'topological_ordering' : tup[0] } + # update the resource estimates of a workflow based on resources given via JSON def update_resource_estimates(workflow, resource_json): resource_dict = load_json(resource_json) @@ -427,7 +437,6 @@ def update_resource_estimates(workflow, resource_json): for task in stages: if task["timeframe"] >= 1: - tf = task["timeframe"] name = "_".join(task["name"].split("_")[:-1]) else: name = task["name"] @@ -497,6 +506,328 @@ def get_alienv_software_environment(packagestring): # # functions for execution; encapsulated in a WorkflowExecutor class # + +class Semaphore: + """ + Object that can be used as semaphore + """ + def __init__(self): + self.locked = False + def lock(self): + self.locked = True + def unlock(self): + self.locked = False + + +class ResourceBoundaries: + """ + Container holding global resource properties + """ + def __init__(self, cpu_limit, mem_limit, dynamic_resources=False, optimistic_resources=False): + self.cpu_limit = cpu_limit + self.mem_limit = mem_limit + self.dynamic_resources = dynamic_resources + # if this is set, tasks that would normally go beyond the resource limits will tried to be run in any case + self.optimistic_resource = optimistic_resources + + +class TaskResources: + """ + Container holding resources of a single task + """ + def __init__(self, tid, name, cpu, mem, resource_boundaries): + # the task ID belonging to these resources + self.tid = tid + self.name = name + # original CPUs/MEM assigned (persistent) + self.cpu_assigned_original = cpu + self.mem_assigned_original = mem + # CPUs/MEM assigned (transient) + self.cpu_assigned = cpu + self.mem_assigned = mem + # global resource settings + self.resource_boundaries = resource_boundaries + # sampled resources of this + self.cpu_sampled = None + self.mem_sampled = None + # Set these after a task has finished to compute new estomates for related tasks + self.walltime = None + self.cpu_taken = None + self.mem_taken = None + # collected during monitoring + self.time_collect = [] + self.cpu_collect = [] + self.mem_collect = [] + # linked to other resources of task that are of the same type as this one + self.related_tasks = None + # can assign a semaphore + self.semaphore = None + # the task's nice value + self.nice_value = None + # whether or not the task's resources are currently booked + self.booked = False + + @property + def is_done(self): + return self.time_collect and not self.booked + + def add(self, time_passed, cpu, mem): + """ + Brief interface to add resources that were measured after time_passed + """ + self.time_collect.append(time_passed) + self.cpu_collect.append(cpu) + self.mem_collect.append(mem) + + def sample_resources(self): + """ + If this task is done, sample CPU and MEM for all related tasks that have not started yet + """ + if not self.is_done: + return + + if len(self.time_collect) < 3: + # Consider at least 3 points to sample from + self.cpu_sampled = self.cpu_assigned + self.mem_sampled = self.mem_assigned + actionlogger.debug("Task %s has not enough points (< 3) to sample resources, setting to previosuly assigned values.", self.name) + else: + # take the time deltas and leave out the very first CPU measurent which is not meaningful, + # at least when it domes from psutil.Proc.cpu_percent(interval=None) + time_deltas = [self.time_collect[i+1] - self.time_collect[i] for i in range(len(self.time_collect) - 1)] + cpu = sum([cpu * time_delta for cpu, time_delta in zip(self.cpu_collect[1:], time_deltas) if cpu >= 0]) + self.cpu_sampled = cpu / sum(time_deltas) + self.mem_sampled = max(self.mem_collect) + + mem_sampled = 0 + cpu_sampled = [] + for res in self.related_tasks: + if res.is_done: + mem_sampled = max(mem_sampled, res.mem_sampled) + cpu_sampled.append(res.cpu_sampled) + cpu_sampled = sum(cpu_sampled) / len(cpu_sampled) + + # This task ran already with the assigned resources, so let's set it to the limit + if cpu_sampled > self.resource_boundaries.cpu_limit: + actionlogger.warning("Sampled CPU (%.2f) exceeds assigned CPU limit (%.2f)", cpu_sampled, self.resource_boundaries.cpu_limit) + cpu_sampled = self.resource_boundaries.cpu_limit + if mem_sampled > self.resource_boundaries.mem_limit: + actionlogger.warning("Sampled MEM (%.2f) exceeds assigned MEM limit (%.2f)", mem_sampled, self.resource_boundaries.mem_limit) + mem_sampled = self.resource_boundaries.mem_limit + + if mem_sampled <= 0: + actionlogger.debug("Sampled memory for %s is %.2f <= 0, setting to previously assigned value %.2f", self.name, mem_sampled, self.mem_assigned) + mem_sampled = self.mem_assigned + if cpu_sampled < 0: + actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned) + cpu_sampled = self.cpu_assigned + for res in self.related_tasks: + if res.is_done or res.booked: + continue + res.cpu_assigned = cpu_sampled + res.mem_assigned = mem_sampled + + +class ResourceManager: + """ + Central class to manage resources + + - CPU limits + - MEM limits + - Semaphores + + Entrypoint to set and to query for resources to be updated. + + Can be asked whether a certain task can be run under current resource usage. + Book and unbook resources. + """ + def __init__(self, cpu_limit, mem_limit, procs_parallel_max=100, dynamic_resources=False, optimistic_resources=False): + """ + Initialise members with defaults + """ + # hold TaskResources of all tasks + self.resources = [] + + # helper dictionaries holding common objects which will be distributed to single TaskResources objects + # to avoid further lookup and at the same time to share the same common objects + self.resources_related_tasks_dict = {} + self.semaphore_dict = {} + + # one common object that holds global resource settings such as CPU and MEM limits + self.resource_boundaries = ResourceBoundaries(cpu_limit, mem_limit, dynamic_resources, optimistic_resources) + + # register resources that are booked under default nice value + self.cpu_booked = 0 + self.mem_booked = 0 + # number of tasks currently booked + self.n_procs = 0 + + # register resources that are booked under high nice value + self.cpu_booked_backfill = 0 + self.mem_booked_backfill = 0 + # number of tasks currently booked under high nice value + self.n_procs_backfill = 0 + + # the maximum number of tasks that run at the same time + self.procs_parallel_max = procs_parallel_max + + # get the default nice value of this python script + self.nice_default = os.nice(0) + # add 19 to get nice value of low-priority tasks + self.nice_backfill = self.nice_default + 19 + + def add_task_resources(self, name, related_tasks_name, cpu, mem, semaphore_string=None): + """ + Construct and Add a new TaskResources object + """ + resources = TaskResources(len(self.resources), name, cpu, mem, self.resource_boundaries) + if cpu > self.resource_boundaries.cpu_limit or mem > self.resource_boundaries.mem_limit: + actionlogger.warning(f"Resource estimates of id {len(self.resources)} overestimates limits, CPU limit: {self.resource_boundaries.cpu_limit}, MEM limit: {self.resource_boundaries.mem_limit}; might not run") + if not self.resource_boundaries.optimistic_resources: + # exit if we don't dare to try + exit(1) + # or we do dare, let's see what happens... + actionlogger.info("We will try to run this task anyway with maximum available resources") + + self.resources.append(resources) + # do the following to have the same Semaphore object for all corresponding TaskResources so that we do not need a lookup + if semaphore_string: + if semaphore_string not in self.semaphore_dict: + self.semaphore_dict[semaphore_string] = Semaphore() + resources.semaphore = self.semaphore_dict[semaphore_string] + + # do the following to give each TaskResources a list of the related tasks so we do not need an additional lookup + if related_tasks_name: + if related_tasks_name not in self.resources_related_tasks_dict: + # assigned list is [valid top be used, list of CPU, list of MEM, list of walltimes of each related task, list of processes that ran in parallel on average, list of taken CPUs, list of assigned CPUs, list of tasks finished in the meantime] + self.resources_related_tasks_dict[related_tasks_name] = [] + self.resources_related_tasks_dict[related_tasks_name].append(resources) + resources.related_tasks = self.resources_related_tasks_dict[related_tasks_name] + + def add_monitored_resources(self, tid, time_delta_since_start, cpu, mem): + self.resources[tid].add(time_delta_since_start, cpu, mem) + + def book(self, tid, nice_value): + """ + Book the resources of this task with given nice value + + The final nice value is determined by the final submission and could be different. + This can happen if the nice value should have been changed while that is not allowed by the system. + """ + res = self.resources[tid] + # take the nice value that was previously assigned when resources where checked last time + previous_nice_value = res.nice_value + + if previous_nice_value is None: + # this has not been checked ever if it was ok to be submitted + actionlogger.warning("Task ID %d has never been checked for resources. Treating as backfill", tid) + nice_value = self.nice_backfill + elif res.nice_value != nice_value: + actionlogger.warning("Task ID %d has was last time checked for a different nice value (%d) but is now submitted with (%d).", tid, res.nice_value, nice_value) + + res.nice_value = nice_value + res.booked = True + if res.semaphore is not None: + res.semaphore.lock() + if nice_value != self.nice_default: + self.n_procs_backfill += 1 + self.cpu_booked_backfill += res.cpu_assigned + self.mem_booked_backfill += res.mem_assigned + return + self.n_procs += 1 + self.cpu_booked += res.cpu_assigned + self.mem_booked += res.mem_assigned + + def unbook(self, tid): + """ + Unbook the reources of this task + """ + res = self.resources[tid] + res.booked = False + if self.resource_boundaries.dynamic_resources: + res.sample_resources() + if res.semaphore is not None: + res.semaphore.unlock() + if res.nice_value != self.nice_default: + self.cpu_booked_backfill -= res.cpu_assigned + self.mem_booked_backfill -= res.mem_assigned + self.n_procs_backfill -= 1 + if self.n_procs_backfill <= 0: + self.cpu_booked_backfill = 0 + self.mem_booked_backfill = 0 + return + self.n_procs -= 1 + self.cpu_booked -= res.cpu_assigned + self.mem_booked -= res.mem_assigned + if self.n_procs <= 0: + self.cpu_booked = 0 + self.mem_booked = 0 + + def ok_to_submit(self, tids): + """ + This generator yields the tid and nice value tuple from the list of task ids that should be checked + """ + tids_copy = tids.copy() + + def ok_to_submit_default(res): + """ + Return default nice value if conditions are met, None otherwise + """ + # analyse CPU + okcpu = (self.cpu_booked + res.cpu_assigned <= self.resource_boundaries.cpu_limit) + # analyse MEM + okmem = (self.mem_booked + res.mem_assigned <= self.resource_boundaries.mem_limit) + actionlogger.debug ('Condition check --normal-- for ' + str(res.tid) + ':' + res.name + ' CPU ' + str(okcpu) + ' MEM ' + str(okmem)) + return self.nice_default if (okcpu and okmem) else None + + def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5): + """ + Return backfill nice value if conditions are met, None otherwise + """ + if self.n_procs_backfill >= args.n_backfill: + return None + + if res.cpu_assigned > 0.9 * self.resource_boundaries.cpu_limit or res.mem_assigned / self.resource_boundaries.cpu_limit >= 1900: + return None + + # analyse CPU + okcpu = (self.cpu_booked_backfill + res.cpu_assigned <= self.resource_boundaries.cpu_limit) + okcpu = okcpu and (self.cpu_booked + self.cpu_booked_backfill + res.cpu_assigned <= backfill_cpu_factor * self.resource_boundaries.cpu_limit) + # analyse MEM + okmem = (self.mem_booked + self.mem_booked_backfill + res.mem_assigned <= backfill_mem_factor * self.resource_boundaries.mem_limit) + actionlogger.debug ('Condition check --backfill-- for ' + str(res.tid) + ':' + res.name + ' CPU ' + str(okcpu) + ' MEM ' + str(okmem)) + + return self.nice_backfill if (okcpu and okmem) else None + + if self.n_procs + self.n_procs_backfill >= self.procs_parallel_max: + # in this case, nothing can be done + return + + for ok_to_submit_impl, should_break in ((ok_to_submit_default, True), (ok_to_submit_backfill, False)): + tid_index = 0 + while tid_index < len(tids_copy): + + tid = tids_copy[tid_index] + res = self.resources[tid] + + actionlogger.info("Setup resources for task %s, cpu: %f, mem: %f", res.name, res.cpu_assigned, res.mem_assigned) + tid_index += 1 + + if (res.semaphore is not None and res.semaphore.locked) or res.booked: + continue + + nice_value = ok_to_submit_impl(res) + if nice_value is not None: + # if we get a non-None nice value, it means that this task is good to go + res.nice_value = nice_value + # yield the tid and its assigned nice value + yield tid, nice_value + + elif should_break: + # break here if resources of the next task do not fit + break + + class WorkflowExecutor: # Constructor def __init__(self, workflowfile, args, jmax=100): @@ -510,6 +841,7 @@ def __init__(self, workflowfile, args, jmax=100): actionlogger.info("Applying global environment from init section " + str(e) + " : " + str(self.globalenv[e])) os.environ[e] = str(self.globalenv[e]) + # only keep those tasks that are necessary to be executed based on user's filters self.workflowspec = filter_workflow(self.workflowspec, args.target_tasks, args.target_labels) if not self.workflowspec['stages']: @@ -518,7 +850,8 @@ def __init__(self, workflowfile, args, jmax=100): exit (0) print ('Workflow is empty. Nothing to do') exit (0) - + + # construct the DAG, compute task weights workflow = build_dag_properties(self.workflowspec) if args.visualize_workflow: draw_workflow(self.workflowspec) @@ -526,29 +859,29 @@ def __init__(self, workflowfile, args, jmax=100): self.taskweights = workflow['weights'] self.topological_orderings = workflow['topological_ordering'] self.taskuniverse = [ l['name'] for l in self.workflowspec['stages'] ] - self.idtotask = [ 0 for l in self.taskuniverse ] + # construct task ID <-> task name lookup + self.idtotask = [ 0 for _ in self.taskuniverse ] self.tasktoid = {} - for i in range(len(self.taskuniverse)): - self.tasktoid[self.taskuniverse[i]]=i - self.idtotask[i]=self.taskuniverse[i] + for i, name in enumerate(self.taskuniverse): + self.tasktoid[name]=i + self.idtotask[i]=name if args.update_resources: update_resource_estimates(self.workflowspec, args.update_resources) - self.maxmemperid = [ self.workflowspec['stages'][tid]['resources']['mem'] for tid in range(len(self.taskuniverse)) ] - self.cpuperid = [ self.workflowspec['stages'][tid]['resources']['cpu'] for tid in range(len(self.taskuniverse)) ] - self.curmembooked = 0 - self.curcpubooked = 0 - self.curmembooked_backfill = 0 - self.curcpubooked_backfill = 0 - self.memlimit = float(args.mem_limit) # some configurable number - self.cpulimit = float(args.cpu_limit) + # construct the object that is in charge of resource management... + self.resource_manager = ResourceManager(args.cpu_limit, args.mem_limit, args.maxjobs, args.dynamic_resources, args.optimistic_resources) + for task in self.workflowspec['stages']: + # ...and add all initial resource estimates + global_task_name = self.get_global_task_name(task["name"]) + self.resource_manager.add_task_resources(task["name"], global_task_name, float(task["resources"]["cpu"]), float(task["resources"]["mem"]), task.get("semaphore")) + self.procstatus = { tid:'ToDo' for tid in range(len(self.workflowspec['stages'])) } self.taskneeds= { t:set(self.getallrequirements(t)) for t in self.taskuniverse } - self.stoponfailure = not (args.keep_going == True) + self.stoponfailure = not args.keep_going print ("Stop on failure ",self.stoponfailure) - self.max_jobs_parallel = int(jmax) - self.scheduling_iteration = 0 + + self.scheduling_iteration = 0 # count how often it was tried to schedule new tasks self.process_list = [] # list of currently scheduled tasks with normal priority self.backfill_process_list = [] # list of curently scheduled tasks with low backfill priority (not sure this is needed) self.pid_to_psutilsproc = {} # cache of putilsproc for resource monitoring @@ -556,20 +889,20 @@ def __init__(self, workflowfile, args, jmax=100): self.pid_to_connections = {} # we can auto-detect what connections are opened by which task (at least to some extent) signal.signal(signal.SIGINT, self.SIGHandler) signal.siginterrupt(signal.SIGINT, False) - self.nicevalues = [ os.nice(0) for tid in range(len(self.taskuniverse)) ] self.internalmonitorcounter = 0 # internal use self.internalmonitorid = 0 # internal use self.tids_marked_toretry = [] # sometimes we might want to retry a failed task (simply because it was "unlucky") and we put them here self.retry_counter = [ 0 for tid in range(len(self.taskuniverse)) ] # we keep track of many times retried already self.task_retries = [ self.workflowspec['stages'][tid].get('retry_count',0) for tid in range(len(self.taskuniverse)) ] # the per task specific "retry" number -> needs to be parsed from the JSON - self.semaphore_values = { self.workflowspec['stages'][tid].get('semaphore'):0 for tid in range(len(self.taskuniverse)) if self.workflowspec['stages'][tid].get('semaphore')!=None } # keeps current count of semaphores (defined in the json workflow). used to achieve user-defined "critical sections". self.alternative_envs = {} # mapping of taskid to alternative software envs (to be applied on a per-task level) # init alternative software environments self.init_alternative_software_environments() def SIGHandler(self, signum, frame): - # basically forcing shut down of all child processes + """ + basically forcing shut down of all child processes + """ actionlogger.info("Signal " + str(signum) + " caught") try: procs = psutil.Process().children(recursive=True) @@ -584,8 +917,8 @@ def SIGHandler(self, signum, frame): p.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied): pass - - gone, alive = psutil.wait_procs(procs, timeout=3) + + _, alive = psutil.wait_procs(procs, timeout=3) for p in alive: try: actionlogger.info("Killing " + str(p)) @@ -595,38 +928,74 @@ def SIGHandler(self, signum, frame): exit (1) - def extract_global_environment(self, workflowspec): - """Checks if the workflow contains a dedicated init task - defining a global environment. Extract information and remove from workflowspec. + """ + Checks if the workflow contains a dedicated init task + defining a global environment. Extract information and remove from workflowspec. """ init_index = 0 # this has to be the first task in the workflow globalenv = {} if workflowspec['stages'][init_index]['name'] == '__global_init_task__': env = workflowspec['stages'][init_index].get('env', None) if env != None: - globalenv = { e : env[e] for e in env } + globalenv = { e : env[e] for e in env } del workflowspec['stages'][init_index] return globalenv - def getallrequirements(self, t): + def get_global_task_name(self, name): + """ + Get the global task name + + Tasks are related if only the suffix _ is different + """ + tokens = name.split("_") + try: + int(tokens[-1]) + return "_".join(tokens[:-1]) + except ValueError: + pass + return name + + def getallrequirements(self, task_name): + """ + get all requirement of a task by its name + """ l=[] - for r in self.workflowspec['stages'][self.tasktoid[t]]['needs']: - l.append(r) - l=l+self.getallrequirements(r) + for required_task_name in self.workflowspec['stages'][self.tasktoid[task_name]]['needs']: + l.append(required_task_name) + l=l+self.getallrequirements(required_task_name) return l - def get_done_filename(self, tid): + def get_logfile(self, tid): + """ + O2 taskwrapper logs task stdout and stderr to logfile .log + Get its exact path based on task ID + """ + # determines the logfile name for this task name = self.workflowspec['stages'][tid]['name'] workdir = self.workflowspec['stages'][tid]['cwd'] - # name and workdir define the "done" file as used by taskwrapper - # this assumes that taskwrapper is used to actually check if something is to be rerun - done_filename = workdir + '/' + name + '.log_done' - return done_filename + return os.path.join(workdir, f"{name}.log") + + def get_done_filename(self, tid): + """ + O2 taskwrapper leaves .log_done after a task has successfully finished + Get its exact path based on task ID + """ + return f"{self.get_logfile(tid)}_done" + + def get_resources_filename(self, tid): + """ + O2 taskwrapper leaves .log_time after a task is done + Get its exact path based on task ID + """ + return f"{self.get_logfile(tid)}_time" # removes the done flag from tasks that need to be run again def remove_done_flag(self, listoftaskids): + """ + Remove .log_done files to given task IDs + """ for tid in listoftaskids: done_filename = self.get_done_filename(tid) name=self.workflowspec['stages'][tid]['name'] @@ -636,13 +1005,23 @@ def remove_done_flag(self, listoftaskids): print ("Marking task " + name + " as to be done again") if os.path.exists(done_filename) and os.path.isfile(done_filename): os.remove(done_filename) - + # submits a task as subprocess and records Popen instance - def submit(self, tid, nice=os.nice(0)): + def submit(self, tid, nice): + """ + Submit a task + + 1. if needed, construct working directory if it does not yet exist + 2. update lookup structures flagging the task as being run + 3. set specific environment if requested for task + 4. construct psutil.Process from command line + 4.1 adjust the niceness of that process if requested + 5. return psutil.Process object + """ actionlogger.debug("Submitting task " + str(self.idtotask[tid]) + " with nice value " + str(nice)) c = self.workflowspec['stages'][tid]['cmd'] workdir = self.workflowspec['stages'][tid]['cwd'] - if not workdir=='': + if workdir: if os.path.exists(workdir) and not os.path.isdir(workdir): actionlogger.error('Cannot create working dir ... some other resource exists already') return None @@ -672,137 +1051,52 @@ def submit(self, tid, nice=os.nice(0)): p = psutil.Popen(['/bin/bash','-c',c], cwd=workdir, env=taskenv) try: p.nice(nice) - self.nicevalues[tid]=nice except (psutil.NoSuchProcess, psutil.AccessDenied): actionlogger.error('Couldn\'t set nice value of ' + str(p.pid) + ' to ' + str(nice)) - self.nicevalues[tid]=os.nice(0) - return p - - def ok_to_submit(self, tid, backfill=False): - softcpufactor=1 - softmemfactor=1 - if backfill: - softcpufactor=1.5 - sotmemfactor=1.5 - - # check semaphore - sem = self.workflowspec['stages'][tid].get('semaphore') - if sem != None: - if self.semaphore_values[sem] > 0: - return False - - # check other resources - if not backfill: - # analyse CPU - okcpu = (self.curcpubooked + float(self.cpuperid[tid]) <= self.cpulimit) - # analyse MEM - okmem = (self.curmembooked + float(self.maxmemperid[tid]) <= self.memlimit) - actionlogger.debug ('Condition check --normal-- for ' + str(tid) + ':' + str(self.idtotask[tid]) + ' CPU ' + str(okcpu) + ' MEM ' + str(okmem)) - return (okcpu and okmem) - else: - # only backfill one job at a time - if self.curcpubooked_backfill > 0: - return False - - # not backfilling jobs which either take much memory or use lot's of CPU anyway - # conditions are somewhat arbitrary and can be played with - if float(self.cpuperid[tid]) > 0.9*float(self.args.cpu_limit): - return False - if float(self.maxmemperid[tid])/float(self.args.cpu_limit) >= 1900: - return False - - # analyse CPU - okcpu = (self.curcpubooked_backfill + float(self.cpuperid[tid]) <= self.cpulimit) - okcpu = okcpu and (self.curcpubooked + self.curcpubooked_backfill + float(self.cpuperid[tid]) <= softcpufactor*self.cpulimit) - # analyse MEM - okmem = (self.curmembooked + self.curmembooked_backfill + float(self.maxmemperid[tid]) <= softmemfactor*self.memlimit) - actionlogger.debug ('Condition check --backfill-- for ' + str(tid) + ':' + str(self.idtotask[tid]) + ' CPU ' + str(okcpu) + ' MEM ' + str(okmem)) - return (okcpu and okmem) - return False + return p def ok_to_skip(self, tid): + """ + Decide if task can be skipped based on existence of .log_done + """ done_filename = self.get_done_filename(tid) if os.path.exists(done_filename) and os.path.isfile(done_filename): return True return False - def book_resources(self, tid, backfill = False): - # books the resources used by a certain task - # semaphores - sem = self.workflowspec['stages'][tid].get('semaphore') - if sem != None: - self.semaphore_values[sem]+=1 - - # CPU + MEM - if not backfill: - self.curmembooked+=float(self.maxmemperid[tid]) - self.curcpubooked+=float(self.cpuperid[tid]) - else: - self.curmembooked_backfill+=float(self.maxmemperid[tid]) - self.curcpubooked_backfill+=float(self.cpuperid[tid]) - - def unbook_resources(self, tid, backfill = False): - # "frees" the nominal resources used by a certain task from the accounting - # so that other jobs can be scheduled - sem = self.workflowspec['stages'][tid].get('semaphore') - if sem != None: - self.semaphore_values[sem]-=1 - - # CPU + MEM - if not backfill: - self.curmembooked-=float(self.maxmemperid[tid]) - self.curcpubooked-=float(self.cpuperid[tid]) - else: - self.curmembooked_backfill-=float(self.maxmemperid[tid]) - self.curcpubooked_backfill-=float(self.cpuperid[tid]) - + def try_job_from_candidates(self, taskcandidates, finished): + """ + Try to schedule next tasks - def try_job_from_candidates(self, taskcandidates, process_list, finished): + Args: + taskcandidates: list + list of possible tasks that can be submitted + finished: list + empty list that will be filled with IDs of tasks that were finished in the meantime + """ self.scheduling_iteration = self.scheduling_iteration + 1 # remove "done / skippable" tasks immediately - tasks_skipped = False for tid in taskcandidates.copy(): # <--- the copy is important !! otherwise this loop is not doing what you think if self.ok_to_skip(tid): finished.append(tid) taskcandidates.remove(tid) - tasks_skipped = True actionlogger.info("Skipping task " + str(self.idtotask[tid])) # if tasks_skipped: - # return # ---> we return early in order to preserve some ordering (the next candidate tried should be daughters of skipped jobs) - - # the ordinary process list part - initialcandidates=taskcandidates.copy() - for tid in initialcandidates: + # return # ---> we return early in order to preserve some ordering (the next candidate tried should be daughters of skipped jobs) + # get task ID and proposed niceness from generator + for (tid, nice_value) in self.resource_manager.ok_to_submit(taskcandidates): actionlogger.debug ("trying to submit " + str(tid) + ':' + str(self.idtotask[tid])) - if (len(self.process_list) + len(self.backfill_process_list) < self.max_jobs_parallel) and self.ok_to_submit(tid): - p=self.submit(tid) - if p!=None: - self.book_resources(tid) - self.process_list.append((tid,p)) - taskcandidates.remove(tid) - # minimal delay - time.sleep(0.1) - else: - break #---> we break at first failure assuming some priority (other jobs may come in via backfill) - - # the backfill part for remaining candidates - initialcandidates=taskcandidates.copy() - for tid in initialcandidates: - actionlogger.debug ("trying to backfill submit " + str(tid) + ':' + str(self.idtotask[tid])) - - if (len(self.process_list) + len(self.backfill_process_list) < self.max_jobs_parallel) and self.ok_to_submit(tid, backfill=True): - p=self.submit(tid, 19) - if p!=None: - self.book_resources(tid, backfill=True) - self.process_list.append((tid,p)) - taskcandidates.remove(tid) #-> not sure about this one - # minimal delay - time.sleep(0.1) - else: - continue + if p := self.submit(tid, nice_value): + # explicitly set the nice value here from the process again because it might happen that submit could not change the niceness + # so we let the ResourceManager know what the final niceness is + self.resource_manager.book(tid, p.nice()) + self.process_list.append((tid,p)) + taskcandidates.remove(tid) + # minimal delay + time.sleep(0.1) def stop_pipeline_and_exit(self, process_list): # kill all remaining jobs @@ -811,7 +1105,17 @@ def stop_pipeline_and_exit(self, process_list): exit(1) + def monitor(self, process_list): + """ + Go through all running tasks and get their current resources + + Resources are summed up for tasks and all their children + + Pass CPU, PSS, USS, niceness, current time to metriclogger + + Warn if overall PSS exceeds assigned memory limit + """ self.internalmonitorcounter+=1 if self.internalmonitorcounter % 5 != 0: return @@ -820,10 +1124,10 @@ def monitor(self, process_list): globalCPU=0. globalPSS=0. - globalCPU_backfill=0. - globalPSS_backfill=0. resources_per_task = {} + for tid, proc in process_list: + # proc is Popen object pid=proc.pid if self.pid_to_files.get(pid)==None: @@ -888,24 +1192,33 @@ def monitor(self, process_list): except (psutil.NoSuchProcess, psutil.AccessDenied): pass - resources_per_task[tid]={'iter':self.internalmonitorid, 'name':self.idtotask[tid], 'cpu':totalCPU, 'uss':totalUSS/1024./1024., 'pss':totalPSS/1024./1024, 'nice':self.nicevalues[tid], 'swap':totalSWAP, 'label':self.workflowspec['stages'][tid]['labels']} + time_delta = int((time.perf_counter() - self.start_time) * 1000) + totalUSS = totalUSS / 1024 / 1024 + totalPSS = totalPSS / 1024 / 1024 + nice_value = proc.nice() + resources_per_task[tid]={'iter':self.internalmonitorid, 'name':self.idtotask[tid], 'cpu':totalCPU, 'uss':totalUSS, 'pss':totalPSS, 'nice':nice_value, 'swap':totalSWAP, 'label':self.workflowspec['stages'][tid]['labels']} + self.resource_manager.add_monitored_resources(tid, time_delta, totalCPU / 100, totalPSS) + if nice_value == self.resource_manager.nice_default: + globalCPU += totalCPU + globalPSS += totalPSS + metriclogger.info(resources_per_task[tid]) send_webhook(self.args.webhook, resources_per_task) - - for r in resources_per_task.values(): - if r['nice']==os.nice(0): - globalCPU+=r['cpu'] - globalPSS+=r['pss'] - else: - globalCPU_backfill+=r['cpu'] - globalPSS_backfill+=r['pss'] - - if globalPSS > self.memlimit: + + if globalPSS > self.resource_manager.resource_boundaries.mem_limit: metriclogger.info('*** MEMORY LIMIT PASSED !! ***') # --> We could use this for corrective actions such as killing jobs currently back-filling # (or better hibernating) def waitforany(self, process_list, finished, failingtasks): + """ + Loop through all submitted tasks and check if they are finished + + 1. If process is still running, do nothing + 2. If process is finished, get its return value, update finished and failingtasks lists + 2.1 unbook resources + 2.2 add taken resources and pass the to ResourceManager + """ failuredetected = False failingpids = [] if len(process_list)==0: @@ -920,9 +1233,10 @@ def waitforany(self, process_list, finished, failingtasks): if returncode!=None: actionlogger.info ('Task ' + str(pid) + ' ' + str(tid)+':'+str(self.idtotask[tid]) + ' finished with status ' + str(returncode)) # account for cleared resources - self.unbook_resources(tid, backfill = self.nicevalues[tid]!=os.nice(0) ) + self.resource_manager.unbook(tid) self.procstatus[tid]='Done' finished.append(tid) + #self.validate_resources_running(tid) process_list.remove(p) if returncode != 0: print (str(self.idtotask[tid]) + ' failed ... checking retry') @@ -932,12 +1246,12 @@ def waitforany(self, process_list, finished, failingtasks): actionlogger.info ('Task ' + str(self.idtotask[tid]) + ' failed but marked to be retried ') self.tids_marked_toretry.append(tid) self.retry_counter[tid] += 1 - + else: failuredetected = True failingpids.append(pid) failingtasks.append(tid) - + if failuredetected and self.stoponfailure: actionlogger.info('Stoping pipeline due to failure in stages with PID ' + str(failingpids)) # self.analyse_files_and_connections() @@ -946,26 +1260,16 @@ def waitforany(self, process_list, finished, failingtasks): self.send_checkpoint(failingtasks, self.args.checkpoint_on_failure) self.stop_pipeline_and_exit(process_list) - # empty finished means we have to wait more + # empty finished means we have to wait more return len(finished)==0 - - def get_logfile(self, tid): - # determines the logfile name for this task - taskspec = self.workflowspec['stages'][tid] - taskname = taskspec['name'] - filename = taskname + '.log' - directory = taskspec['cwd'] - return directory + '/' + filename - - def is_worth_retrying(self, tid): # This checks for some signatures in logfiles that indicate that a retry of this task # might have a chance. # Ideally, this should be made user configurable. Either the user could inject a lambda # or a regular expression to use. For now we just put a hard coded list logfile = self.get_logfile(tid) - + return True #! --> for now we just retry tasks a few times # 1) ZMQ_EVENT + interrupted system calls (DPL bug during shutdown) @@ -975,7 +1279,7 @@ def is_worth_retrying(self, tid): # return True # return False - + def cat_logfiles_tostdout(self, taskids): # In case of errors we can cat the logfiles for this taskname @@ -1215,7 +1519,7 @@ def noprogress_errormsg(self): print (msg, file=sys.stderr) def execute(self): - starttime = time.perf_counter() + self.start_time = time.perf_counter() psutil.cpu_percent(interval=None) os.environ['JOBUTILS_SKIPDONE'] = "ON" errorencountered = False @@ -1269,7 +1573,7 @@ def speedup_ROOT_Init(): for i,t in enumerate(self.workflowspec['stages'],0): print (t['name'] + ' (' + str(t['labels']) + ')' + ' ToDo: ' + str(not self.ok_to_skip(i))) exit (0) - + if args.produce_script != None: self.produce_script(args.produce_script) exit (0) @@ -1289,12 +1593,12 @@ def speedup_ROOT_Init(): # ***************** # main control loop # ***************** - currenttimeframe=1 candidates = [ tid for tid in self.possiblenexttask[-1] ] self.process_list=[] # list of tuples of nodes ids and Popen subprocess instances finishedtasks=[] # global list of finished tasks + try: while True: @@ -1306,13 +1610,13 @@ def speedup_ROOT_Init(): finished = [] # --> to account for finished because already done or skipped actionlogger.debug('Sorted current candidates: ' + str([(c,self.idtotask[c]) for c in candidates])) - self.try_job_from_candidates(candidates, self.process_list, finished) + self.try_job_from_candidates(candidates, finished) if len(candidates) > 0 and len(self.process_list) == 0: self.noprogress_errormsg() send_webhook(self.args.webhook,"Unable to make further progress: Quitting") errorencountered = True break - + finished_from_started = [] # to account for finished when actually started failing = [] while self.waitforany(self.process_list, finished_from_started, failing): @@ -1361,10 +1665,10 @@ def speedup_ROOT_Init(): # try to see if this is really a candidate: if self.is_good_candidate(candid, finishedtasks) and candidates.count(candid)==0: candidates.append(candid) - + actionlogger.debug("New candidates " + str( candidates)) send_webhook(self.args.webhook, "New candidates " + str(candidates)) - + if len(candidates)==0 and len(self.process_list)==0: break except Exception as e: @@ -1381,16 +1685,22 @@ def speedup_ROOT_Init(): if errorencountered: statusmsg = "with failures" - print ('\n**** Pipeline done ' + statusmsg + ' (global_runtime : {:.3f}s) *****\n'.format(endtime-starttime)) - actionlogger.debug("global_runtime : {:.3f}s".format(endtime-starttime)) + print ('\n**** Pipeline done ' + statusmsg + ' (global_runtime : {:.3f}s) *****\n'.format(endtime-self.start_time)) + actionlogger.debug("global_runtime : {:.3f}s".format(endtime-self.start_time)) return errorencountered if args.cgroup!=None: myPID=os.getpid() - command="echo " + str(myPID) + " > /sys/fs/cgroup/cpuset/"+args.cgroup+"/tasks" - actionlogger.info("applying cgroups " + command) - os.system(command) + # cgroups such as /sys/fs/cgroup/cpuset//tasks + # or /sys/fs/cgroup/cpu//tasks + command="echo " + str(myPID) + f" > {args.cgroup}" + actionlogger.info(f"Try running in cgroup {args.cgroup}") + waitstatus = os.system(command) + if code := os.waitstatus_to_exitcode(waitstatus): + actionlogger.error(f"Could not apply cgroup") + exit(code) + actionlogger.info("Running in cgroup") executor=WorkflowExecutor(args.workflowfile,jmax=int(args.maxjobs),args=args) exit (executor.execute())