diff --git a/tasks/elastic/plot.py b/tasks/elastic/plot.py index c5db051..8aae71e 100644 --- a/tasks/elastic/plot.py +++ b/tasks/elastic/plot.py @@ -5,7 +5,6 @@ from os.path import join from pandas import read_csv from tasks.util.elastic import ELASTIC_PLOTS_DIR, ELASTIC_RESULTS_DIR -from tasks.util.env import SYSTEM_NAME from tasks.util.plot import SINGLE_COL_FIGSIZE, save_plot @@ -42,7 +41,11 @@ def plot(ctx): makedirs(ELASTIC_PLOTS_DIR, exist_ok=True) fig, ax = subplots(figsize=SINGLE_COL_FIGSIZE) - assert len(results["elastic"]) == len(results["no-elastic"]), "Results mismatch! (elastic: {} - no-elastic: {})".format(len(results["elastic"]), len(results["no-elastic"])) + assert len(results["elastic"]) == len( + results["no-elastic"] + ), "Results mismatch! (elastic: {} - no-elastic: {})".format( + len(results["elastic"]), len(results["no-elastic"]) + ) xs = list(results["elastic"].keys()) ys = [ diff --git a/tasks/elastic/run.py b/tasks/elastic/run.py index 303ed07..1c1f0b6 100644 --- a/tasks/elastic/run.py +++ b/tasks/elastic/run.py @@ -70,13 +70,16 @@ def wasm(ctx, num_threads=None, elastic=False, repeats=1): reset_planner(num_vms) - csv_name = "openmp_{}_granny.csv".format("elastic" if elastic else "no-elastic") + csv_name = "openmp_{}_granny.csv".format( + "elastic" if elastic else "no-elastic" + ) _init_csv_file(csv_name) for nthread in num_threads: for r in range(int(repeats)): print( - "Running OpenMP elastic experiment with {} threads (elastic: {} - repeat: {}/{})".format( + "Running OpenMP elastic experiment with {} threads" + " (elastic: {} - repeat: {}/{})".format( nthread, elastic, r + 1, repeats ) ) diff --git a/tasks/elastic/wasm.py b/tasks/elastic/wasm.py index c5335a0..b8a89c3 100644 --- a/tasks/elastic/wasm.py +++ b/tasks/elastic/wasm.py @@ -12,11 +12,13 @@ def upload(ctx): """ Upload the OpenMP functions to Granny """ - wasm_file_details = [{ - "wasm_file": OPENMP_ELASTIC_WASM, - "wasm_user": OPENMP_ELASTIC_USER, - "wasm_function": OPENMP_ELASTIC_FUNCTION, - "copies": 1, - }] + wasm_file_details = [ + { + "wasm_file": OPENMP_ELASTIC_WASM, + "wasm_user": OPENMP_ELASTIC_USER, + "wasm_function": OPENMP_ELASTIC_FUNCTION, + "copies": 1, + } + ] upload_wasm(wasm_file_details) diff --git a/tasks/kernels_omp/run.py b/tasks/kernels_omp/run.py index 089ffea..9dded1b 100644 --- a/tasks/kernels_omp/run.py +++ b/tasks/kernels_omp/run.py @@ -115,7 +115,9 @@ def wasm(ctx, kernel=None, num_threads=None, repeats=1): "singleHostHint": True, } - result_json = post_async_msg_and_get_result_json(msg, req_dict=req) + result_json = post_async_msg_and_get_result_json( + msg, req_dict=req + ) actual_time = get_faasm_exec_time_from_json( result_json, check=True ) diff --git a/tasks/makespan/plot.py b/tasks/makespan/plot.py index 70b4ad9..f38df94 100644 --- a/tasks/makespan/plot.py +++ b/tasks/makespan/plot.py @@ -6,9 +6,10 @@ read_elastic_results, ) from tasks.util.eviction import ( - read_eviction_results, - plot_eviction_results, + read_eviction_results, + plot_eviction_results, ) + # TODO: consider moving some of the migration to a different file (e.g. # tasks.util.locality) from tasks.util.makespan import ( @@ -98,40 +99,20 @@ def locality(ctx): # Plot 1: boxplot of idle vCPUs and num xVM links for various cluster sizes # ---------- - do_makespan_plot( - "percentage_vcpus", - results, - ax1, - num_vms, - num_tasks - ) + do_makespan_plot("percentage_vcpus", results, ax1, num_vms, num_tasks) - do_makespan_plot( - "percentage_xvm", - results, - ax2, - num_vms, - num_tasks - ) + do_makespan_plot("percentage_xvm", results, ax2, num_vms, num_tasks) # ---------- # Plot 2: (two) timeseries of one of the cluster sizes # ---------- do_makespan_plot( - "ts_vcpus", - results, - ax3, - timeseries_num_vms, - timeseries_num_tasks + "ts_vcpus", results, ax3, timeseries_num_vms, timeseries_num_tasks ) do_makespan_plot( - "ts_xvm_links", - results, - ax4, - timeseries_num_vms, - timeseries_num_tasks + "ts_xvm_links", results, ax4, timeseries_num_vms, timeseries_num_tasks ) # Manually craft the legend @@ -139,14 +120,15 @@ def locality(ctx): legend_entries = [ Patch( color=get_color_for_baseline("mpi-migrate", baseline), - label=get_label_for_baseline("mpi-migrate", baseline) - ) for baseline in baselines + label=get_label_for_baseline("mpi-migrate", baseline), + ) + for baseline in baselines ] fig.legend( handles=legend_entries, loc="upper center", ncols=len(baselines), - bbox_to_anchor=(0.52, 1.07) + bbox_to_anchor=(0.52, 1.07), ) save_plot(fig, MAKESPAN_PLOTS_DIR, "makespan_locality") @@ -173,10 +155,11 @@ def eviction(ctx): results = {} for (n_vms, n_users, n_tasks) in zip(num_vms, num_users, num_tasks): - results[n_vms] = read_eviction_results(n_vms, n_users, n_tasks, num_cpus_per_vm) + results[n_vms] = read_eviction_results( + n_vms, n_users, n_tasks, num_cpus_per_vm + ) - fig, ax = subplot_mosaic([['left', 'right'], - ['left', 'right']]) + fig, ax = subplot_mosaic([["left", "right"], ["left", "right"]]) # ---------- # Plot 1: bar plot of the CPUsecs per execution @@ -254,14 +237,15 @@ def spot(ctx): legend_entries = [ Patch( color=get_color_for_baseline("mpi-spot", baseline), - label=get_label_for_baseline("mpi-spot", baseline) - ) for baseline in baselines + label=get_label_for_baseline("mpi-spot", baseline), + ) + for baseline in baselines ] fig.legend( handles=legend_entries, loc="upper center", ncols=len(baselines), - bbox_to_anchor=(0.52, 1.07) + bbox_to_anchor=(0.52, 1.07), ) save_plot(fig, MAKESPAN_PLOTS_DIR, "makespan_spot") @@ -330,7 +314,7 @@ def elastic(ctx): results, ax3, cdf_num_vms=cdf_num_vms, - cdf_num_tasks=cdf_num_tasks + cdf_num_tasks=cdf_num_tasks, ) # ---------- @@ -342,7 +326,7 @@ def elastic(ctx): results, ax4, timeseries_num_vms=timeseries_num_vms, - timeseries_num_tasks=timeseries_num_tasks + timeseries_num_tasks=timeseries_num_tasks, ) # Manually craft the legend @@ -350,14 +334,15 @@ def elastic(ctx): legend_entries = [ Patch( color=get_color_for_baseline("omp-elastic", baseline), - label=get_label_for_baseline("omp-elastic", baseline) - ) for baseline in baselines + label=get_label_for_baseline("omp-elastic", baseline), + ) + for baseline in baselines ] fig.legend( handles=legend_entries, loc="upper center", ncols=len(baselines), - bbox_to_anchor=(0.52, 1.07) + bbox_to_anchor=(0.52, 1.07), ) save_plot(fig, MAKESPAN_PLOTS_DIR, "makespan_elastic") diff --git a/tasks/makespan/run.py b/tasks/makespan/run.py index 2e3c7e4..d4d91fb 100644 --- a/tasks/makespan/run.py +++ b/tasks/makespan/run.py @@ -23,7 +23,7 @@ write_line_to_csv, ) from tasks.util.trace import load_task_trace_from_file -from time import sleep, time +from time import time from typing import Dict # Configure the logging settings globally @@ -67,13 +67,19 @@ def granny( # Work-out the baseline name from the arguments baseline = "granny" if migrate: - assert workload == "mpi-migrate", "--migrate flag should only be used with mpi-migrate workload!" + assert ( + workload == "mpi-migrate" + ), "--migrate flag should only be used with mpi-migrate workload!" baseline = "granny-migrate" if fault: - assert workload == "mpi-spot", "--fault flag should only be used with mpi-spot workload!" + assert ( + workload == "mpi-spot" + ), "--fault flag should only be used with mpi-spot workload!" baseline = "granny-ft" if elastic: - assert workload == "omp-elastic", "--fault flag should only be used with omp-elastic workload!" + assert ( + workload == "omp-elastic" + ), "--fault flag should only be used with omp-elastic workload!" baseline = "granny-elastic" workload = _get_workload_from_cmdline(workload) diff --git a/tasks/makespan/scheduler.py b/tasks/makespan/scheduler.py index 1630b5b..299bc58 100644 --- a/tasks/makespan/scheduler.py +++ b/tasks/makespan/scheduler.py @@ -117,6 +117,7 @@ def fault_injection_thread( FT period we make up. We could consider changing them but, for simplicity we keep them the same """ + def get_next_evicted_host(baseline, num_hosts_to_evict): if baseline in GRANNY_BASELINES: vm_names = get_faasm_worker_names() @@ -124,10 +125,16 @@ def get_next_evicted_host(baseline, num_hosts_to_evict): else: vm_names, vm_ips = get_native_mpi_pods("makespan") - assert len(vm_names) == num_vms, "Mismatch in FT thread picking next evicted host {} != {}".format(num_vms, len(vm_names)) + assert ( + len(vm_names) == num_vms + ), "Mismatch in FT thread picking next evicted host {} != {}".format( + num_vms, len(vm_names) + ) evicted_idxs = sample(range(0, len(vm_names)), num_hosts_to_evict) - evicted_vm_names = [vm_names[evicted_idx] for evicted_idx in evicted_idxs] + evicted_vm_names = [ + vm_names[evicted_idx] for evicted_idx in evicted_idxs + ] evicted_vm_ips = [vm_ips[evicted_idx] for evicted_idx in evicted_idxs] return evicted_vm_names, evicted_vm_ips @@ -137,7 +144,9 @@ def get_next_evicted_host(baseline, num_hosts_to_evict): # batch-scheduler (grace period). In the loop, we first sleep for (i) - # (ii) and then for (ii) while True: - next_evicted_hosts, next_evicted_ips = get_next_evicted_host(baseline, num_faults) + next_evicted_hosts, next_evicted_ips = get_next_evicted_host( + baseline, num_faults + ) # First, sleep until we need to give the grace period sleep(fault_injection_period_secs - host_grace_period_secs) @@ -161,7 +170,11 @@ def get_next_evicted_host(baseline, num_hosts_to_evict): def dequeue_with_timeout( - queue: Queue, queue_str: str, silent: bool = False, throw: bool = False, timeout_s: int = QUEUE_TIMEOUT_SEC, + queue: Queue, + queue_str: str, + silent: bool = False, + throw: bool = False, + timeout_s: int = QUEUE_TIMEOUT_SEC, ) -> Union[ResultQueueItem, WorkQueueItem]: while True: try: @@ -305,7 +318,9 @@ def thread_print(msg): openmp_cmd = "bash -c '{} {} {}'".format( get_elastic_input_data(native=True), OPENMP_ELASTIC_NATIVE_BINARY, - get_openmp_kernel_cmdline(ELASTIC_KERNEL, work_item.task.size), + get_openmp_kernel_cmdline( + ELASTIC_KERNEL, work_item.task.size + ), ) exec_cmd = [ @@ -318,7 +333,7 @@ def thread_print(msg): start_ts = time() try: - out = run_kubectl_cmd("makespan", exec_cmd) + run_kubectl_cmd("makespan", exec_cmd) except CalledProcessError: has_failed = True actual_time = int(time() - start_ts) @@ -335,7 +350,9 @@ def thread_print(msg): req["user"] = user req["function"] = func if get_workload_from_trace(trace_str) == "mpi-evict": - req["subType"] = get_user_id_from_task(num_tasks_per_user, work_item.task.task_id) + req["subType"] = get_user_id_from_task( + num_tasks_per_user, work_item.task.task_id + ) msg = { "user": user, @@ -346,7 +363,9 @@ def thread_print(msg): } # If attempting to migrate, add migration parameters - baselines_with_migration = GRANNY_MIGRATE_BASELINES + GRANNY_FT_BASELINES + baselines_with_migration = ( + GRANNY_MIGRATE_BASELINES + GRANNY_FT_BASELINES + ) if work_item.task.app in MPI_MIGRATE_WORKLOADS: check_every = ( 1 @@ -370,7 +389,9 @@ def thread_print(msg): "user": user, "function": func, "input_data": get_elastic_input_data(), - "cmdline": get_openmp_kernel_cmdline(ELASTIC_KERNEL, work_item.task.size), + "cmdline": get_openmp_kernel_cmdline( + ELASTIC_KERNEL, work_item.task.size + ), "isOmp": True, "ompNumThreads": work_item.task.size, } @@ -394,7 +415,9 @@ def thread_print(msg): end_ts = time() if has_failed: - sch_logger.error("Error executing task {}".format(work_item.task.task_id)) + sch_logger.error( + "Error executing task {}".format(work_item.task.task_id) + ) result_queue.put( ResultQueueItem( work_item.task.task_id, @@ -519,7 +542,9 @@ def init_vm_list(self): def update_vm_list(self): if self.baseline not in NATIVE_BASELINES: - raise RuntimeError("This method should only be used in native baselines!") + raise RuntimeError( + "This method should only be used in native baselines!" + ) wait_for_native_mpi_pods( get_native_mpi_namespace("makespan"), @@ -796,7 +821,9 @@ def shutdown(self): # In a multi-tenant setting, we want to _not_ consider for scheduling nodes # that are already running tasks for different users def prune_node_list_from_different_users(self, nodes, this_task): - this_task_id = get_user_id_from_task(self.state.num_tasks_per_user, this_task.task_id) + this_task_id = get_user_id_from_task( + self.state.num_tasks_per_user, this_task.task_id + ) def get_indx_in_list(node_list, host_ip): for idx, pair in enumerate(node_list): @@ -806,7 +833,10 @@ def get_indx_in_list(node_list, host_ip): return -1 for task_id in self.state.in_flight_tasks: - if get_user_id_from_task(self.state.num_tasks_per_user, task_id) == this_task_id: + if ( + get_user_id_from_task(self.state.num_tasks_per_user, task_id) + == this_task_id + ): continue sched_decision = self.state.in_flight_tasks[task_id] @@ -832,18 +862,27 @@ def have_enough_slots_for_task(self, task: TaskObject): # For `mpi-evict` we run a multi-tenant trace, and prevent apps # from different users from running in the same VM sorted_vms = sorted( - self.state.vm_map.items(), key=lambda item: item[1], reverse=True + self.state.vm_map.items(), + key=lambda item: item[1], + reverse=True, ) - pruned_vms = self.prune_node_list_from_different_users(sorted_vms, task) + pruned_vms = self.prune_node_list_from_different_users( + sorted_vms, task + ) - return self.num_available_slots_from_vm_list(pruned_vms) >= task.size + return ( + self.num_available_slots_from_vm_list(pruned_vms) + >= task.size + ) elif self.state.workload in OPENMP_WORKLOADS: # For OpenMP workloads, we can only allocate them in one VM, so # we compare the requested size with the largest capacity we # have in one VM sorted_vms = sorted( - self.state.vm_map.items(), key=lambda item: item[1], reverse=True + self.state.vm_map.items(), + key=lambda item: item[1], + reverse=True, ) return sorted_vms[0][1] >= task.size @@ -853,30 +892,46 @@ def have_enough_slots_for_task(self, task: TaskObject): # For Granny, we can always rely on the planner to let us know # how many slots we can use if self.state.workload == "mpi-evict": - return get_num_available_slots_from_in_flight_apps( - self.state.num_vms, - self.state.num_cpus_per_vm, - user_id=get_user_id_from_task(self.state.num_tasks_per_user, task.task_id) - ) >= task.size + return ( + get_num_available_slots_from_in_flight_apps( + self.state.num_vms, + self.state.num_cpus_per_vm, + user_id=get_user_id_from_task( + self.state.num_tasks_per_user, task.task_id + ), + ) + >= task.size + ) - if self.state.workload == "mpi-spot" and self.state.baseline in GRANNY_FT_BASELINES: - return get_num_available_slots_from_in_flight_apps( - self.state.num_vms, - self.state.num_cpus_per_vm, - num_evicted_vms=self.state.num_faults, - ) >= task.size + if ( + self.state.workload == "mpi-spot" + and self.state.baseline in GRANNY_FT_BASELINES + ): + return ( + get_num_available_slots_from_in_flight_apps( + self.state.num_vms, + self.state.num_cpus_per_vm, + num_evicted_vms=self.state.num_faults, + ) + >= task.size + ) if self.state.workload in OPENMP_WORKLOADS: - return get_num_available_slots_from_in_flight_apps( - self.state.num_vms, - self.state.num_cpus_per_vm, - openmp=True, - ) >= task.size + return ( + get_num_available_slots_from_in_flight_apps( + self.state.num_vms, + self.state.num_cpus_per_vm, + openmp=True, + ) + >= task.size + ) - return get_num_available_slots_from_in_flight_apps( - self.state.num_vms, - self.state.num_cpus_per_vm - ) >= task.size + return ( + get_num_available_slots_from_in_flight_apps( + self.state.num_vms, self.state.num_cpus_per_vm + ) + >= task.size + ) def schedule_task_to_vm( self, task: TaskObject @@ -907,7 +962,9 @@ def schedule_task_to_vm( ) if self.state.workload == "mpi-evict": - sorted_vms = self.prune_node_list_from_different_users(sorted_vms, task) + sorted_vms = self.prune_node_list_from_different_users( + sorted_vms, task + ) if self.state.workload in OPENMP_WORKLOADS: sorted_vms = [sorted_vms[0]] @@ -991,7 +1048,10 @@ def execute_tasks( # In the MPI evict baseline we want to query often about being # able to schedule, as some planner migrations may unblock # scheduling - if self.state.workload == "mpi-evict" and self.state.baseline in GRANNY_BASELINES: + if ( + self.state.workload == "mpi-evict" + and self.state.baseline in GRANNY_BASELINES + ): # If there are not enough slots, first try to deque try: result = dequeue_with_timeout( @@ -1056,7 +1116,9 @@ def execute_tasks( # we will go back to the beginning self.state.next_task_in_queue = None while self.state.executed_task_count < len(tasks): - result = dequeue_with_timeout(self.result_queue, "result queue") + result = dequeue_with_timeout( + self.result_queue, "result queue" + ) # Update our local records according to result self.state.update_records_from_result(result) diff --git a/tasks/migration/plot.py b/tasks/migration/plot.py index b41abb8..a35332d 100644 --- a/tasks/migration/plot.py +++ b/tasks/migration/plot.py @@ -1,11 +1,10 @@ from glob import glob from invoke import task -from matplotlib.pyplot import hlines, savefig, subplots +from matplotlib.pyplot import hlines, subplots from numpy import arange -from os import makedirs from os.path import join from pandas import read_csv -from tasks.util.env import PLOTS_ROOT, PROJ_ROOT +from tasks.util.env import PROJ_ROOT from tasks.util.migration import MIGRATION_PLOTS_DIR from tasks.util.plot import save_plot @@ -100,4 +99,6 @@ def do_plot(workload, migration_results): hlines(1, xlim_left, xlim_right, linestyle="dashed", colors="red") - save_plot(fig, MIGRATION_PLOTS_DIR, "migration_speedup_{}".format(workload)) + save_plot( + fig, MIGRATION_PLOTS_DIR, "migration_speedup_{}".format(workload) + ) diff --git a/tasks/motivation/plot.py b/tasks/motivation/plot.py index 370027c..0505ba5 100644 --- a/tasks/motivation/plot.py +++ b/tasks/motivation/plot.py @@ -26,7 +26,9 @@ def locality(ctx): num_cpus_per_vm = 8 results = {} - results[num_vms] = read_makespan_results(num_vms, num_tasks, num_cpus_per_vm) + results[num_vms] = read_makespan_results( + num_vms, num_tasks, num_cpus_per_vm + ) makedirs(MOTIVATION_PLOTS_DIR, exist_ok=True) # ---------- @@ -41,14 +43,15 @@ def locality(ctx): legend_entries = [ Patch( color=get_color_for_baseline("mpi-migrate", baseline), - label=get_label_for_baseline("mpi-migrate", baseline) - ) for baseline in baselines + label=get_label_for_baseline("mpi-migrate", baseline), + ) + for baseline in baselines ] fig.legend( handles=legend_entries, loc="upper center", ncols=len(baselines), - bbox_to_anchor=(0.52, 1.07) + bbox_to_anchor=(0.52, 1.07), ) save_plot(fig, MOTIVATION_PLOTS_DIR, "motivation_vcpus") diff --git a/tasks/openmpi/run.py b/tasks/openmpi/run.py index c8b839b..16612c5 100644 --- a/tasks/openmpi/run.py +++ b/tasks/openmpi/run.py @@ -116,9 +116,7 @@ def generate_partitions(ctx, max_num_partitions=5): partitions = [ part for part in partitions if max(part) <= NUM_CORES_PER_CTR ] - partitions = [ - part for part in partitions if len(part) <= num_vms - ] + partitions = [part for part in partitions if len(part) <= num_vms] if len(partitions) > max_num_partitions: links = [ (ind, get_xvm_links_from_part(p)) @@ -181,7 +179,7 @@ def wasm(ctx): "num_loops": 3, "num_net_loops": 1e6, "chunk_size": 1e1, - } + }, }, "conf2": { "data_file": "examples/controller/in.controller.wall", @@ -189,7 +187,7 @@ def wasm(ctx): "num_loops": 1, "num_net_loops": 0, "chunk_size": 1e1, - } + }, }, } @@ -203,7 +201,8 @@ def do_run(native): conf = EXP_CONFIG["conf2"] for part in partitions: print( - "Running LAMMPS ({}) with {} MPI procs (data file: {}, params: {}, part: {}, xvm: {})".format( + "Running LAMMPS ({}) with {} MPI procs (data file: {}," + " params: {}, part: {}, xvm: {})".format( "native" if native else "wasm", get_nproc_from_part(part), conf["data_file"], @@ -217,7 +216,9 @@ def do_run(native): actual_time = run_native(part, conf) else: actual_time = run_wasm(part, conf) - write_csv_line(csv_name, part, get_xvm_links_from_part(part), actual_time) + write_csv_line( + csv_name, part, get_xvm_links_from_part(part), actual_time + ) def run_wasm(part, config): diff --git a/tasks/util/compose.py b/tasks/util/compose.py index ccc46b6..719868e 100644 --- a/tasks/util/compose.py +++ b/tasks/util/compose.py @@ -1,12 +1,19 @@ from os.path import join from subprocess import run -from tasks.util.env import ACR_NAME, FAABRIC_EXP_IMAGE_NAME, PROJ_ROOT, get_version +from tasks.util.env import ( + ACR_NAME, + FAABRIC_EXP_IMAGE_NAME, + PROJ_ROOT, + get_version, +) NUM_CORES_PER_CTR = 8 OPENMPI_COMPOSE_DIR = join(PROJ_ROOT, "tasks", "openmpi") OPENMMPI_COMPOSE_FILE = join(OPENMPI_COMPOSE_DIR, "docker-compose.yml") ENV_VARS = { - "FAABRIC_EXP_IMAGE_NAME": "{}/{}:{}".format(ACR_NAME, FAABRIC_EXP_IMAGE_NAME, get_version()), + "FAABRIC_EXP_IMAGE_NAME": "{}/{}:{}".format( + ACR_NAME, FAABRIC_EXP_IMAGE_NAME, get_version() + ), "NUM_CORES_PER_VM": "{}".format(NUM_CORES_PER_CTR), } @@ -14,17 +21,23 @@ def run_compose_cmd(compose_cmd, capture_output=False): # Note that we actually run a docker command as we specify the container # by name - docker_cmd = "docker compose -f {} {}".format(OPENMMPI_COMPOSE_FILE, compose_cmd) + docker_cmd = "docker compose -f {} {}".format( + OPENMMPI_COMPOSE_FILE, compose_cmd + ) if capture_output: - stdout = run( - docker_cmd, - shell=True, - check=True, - cwd=OPENMPI_COMPOSE_DIR, - capture_output=True, - env=ENV_VARS, - ).stdout.decode("utf-8").strip() + stdout = ( + run( + docker_cmd, + shell=True, + check=True, + cwd=OPENMPI_COMPOSE_DIR, + capture_output=True, + env=ENV_VARS, + ) + .stdout.decode("utf-8") + .strip() + ) return stdout @@ -40,24 +53,35 @@ def run_compose_cmd(compose_cmd, capture_output=False): def get_compose_ctrs(): ctr_ids = run_compose_cmd("ps -aq", capture_output=True).split("\n") - docker_ip_cmd = "docker inspect -f '{{{{range.NetworkSettings.Networks}}}}{{{{.IPAddress}}}}{{{{end}}}}' {}" + docker_ip_cmd = "docker inspect -f" + docker_ip_cmd += " '{{{{range.NetworkSettings.Networks}}}}{{{{.IPAddress" + docker_ip_cmd += "}}}}{{{{end}}}}' {}" + docker_name_cmd = "docker inspect -f '{{{{.Name}}}}' {}" ctr_ips = [] ctr_names = [] for ctr_id in ctr_ids: - ctr_ips.append(run( - docker_ip_cmd.format(ctr_id), - shell=True, - check=True, - capture_output=True - ).stdout.decode("utf-8").strip()) - - ctr_names.append(run( - docker_name_cmd.format(ctr_id), - shell=True, - check=True, - capture_output=True - ).stdout.decode("utf-8").strip()[1:]) + ctr_ips.append( + run( + docker_ip_cmd.format(ctr_id), + shell=True, + check=True, + capture_output=True, + ) + .stdout.decode("utf-8") + .strip() + ) + + ctr_names.append( + run( + docker_name_cmd.format(ctr_id), + shell=True, + check=True, + capture_output=True, + ) + .stdout.decode("utf-8") + .strip()[1:] + ) return ctr_names, ctr_ips diff --git a/tasks/util/elastic.py b/tasks/util/elastic.py index 6df5abc..207e368 100644 --- a/tasks/util/elastic.py +++ b/tasks/util/elastic.py @@ -30,10 +30,16 @@ ELASTIC_KERNELS_DOCKER_DIR = join(EXAMPLES_DOCKER_DIR, "Kernels-elastic") ELASTIC_KERNELS_WASM_DIR = join(ELASTIC_KERNELS_DOCKER_DIR, "build", "wasm") -ELASTIC_KERNELS_NATIVE_DIR = join(ELASTIC_KERNELS_DOCKER_DIR, "build", "native") +ELASTIC_KERNELS_NATIVE_DIR = join( + ELASTIC_KERNELS_DOCKER_DIR, "build", "native" +) -OPENMP_ELASTIC_WASM = join(ELASTIC_KERNELS_WASM_DIR, "omp_{}.wasm".format(ELASTIC_KERNEL)) -OPENMP_ELASTIC_NATIVE_BINARY = join(ELASTIC_KERNELS_NATIVE_DIR, "omp_{}.o".format(ELASTIC_KERNEL)) +OPENMP_ELASTIC_WASM = join( + ELASTIC_KERNELS_WASM_DIR, "omp_{}.wasm".format(ELASTIC_KERNEL) +) +OPENMP_ELASTIC_NATIVE_BINARY = join( + ELASTIC_KERNELS_NATIVE_DIR, "omp_{}.o".format(ELASTIC_KERNEL) +) # Parameters for the macrobenchmark OPENMP_ELASTIC_NUM_LOOPS = 5 @@ -43,7 +49,9 @@ def get_elastic_input_data(num_loops=OPENMP_ELASTIC_NUM_LOOPS, native=False): if native: return "FAASM_BENCH_PARAMS={}".format(int(num_loops)) - return b64encode("{}".format(int(num_loops)).encode("utf-8")).decode("utf-8") + return b64encode("{}".format(int(num_loops)).encode("utf-8")).decode( + "utf-8" + ) def read_elastic_results(num_vms, num_tasks, num_cpus_per_vm): @@ -62,7 +70,9 @@ def read_elastic_results(num_vms, num_tasks, num_cpus_per_vm): result_dict[baseline] = {} makespan_s = results["MakespanSecs"].to_list() - assert len(makespan_s) == 1, "Too many rows: expected 1, got {}!".format(len(makespan_s)) + assert ( + len(makespan_s) == 1 + ), "Too many rows: expected 1, got {}!".format(len(makespan_s)) makespan_s = makespan_s[0] result_dict[baseline]["makespan"] = makespan_s @@ -137,8 +147,10 @@ def read_elastic_results(num_vms, num_tasks, num_cpus_per_vm): result_dict[baseline]["ts_vcpus"] = {} total_available_vcpus = num_vms * num_cpus_per_vm - sched_info_csv = "makespan_sched-info_{}_{}_omp-elastic_{}_{}.csv".format( + sched_info_csv = ( + "makespan_sched-info_{}_{}_omp-elastic_{}_{}.csv".format( baseline, num_vms, num_tasks, num_cpus_per_vm + ) ) if baseline in NATIVE_BASELINES: @@ -157,14 +169,20 @@ def read_elastic_results(num_vms, num_tasks, num_cpus_per_vm): # idle VMs as a number (not as a set) for ts in result_dict[baseline]["ts_vcpus"]: result_dict[baseline]["ts_vcpus"][ts] = ( - result_dict[baseline]["ts_vcpus"][ts] / total_available_vcpus + result_dict[baseline]["ts_vcpus"][ts] + / total_available_vcpus ) * 100 else: # For Granny, the idle vCPUs results are directly available in # the file sch_info_csv = read_csv(join(MAKESPAN_RESULTS_DIR, sched_info_csv)) - idle_cpus = (sch_info_csv["NumIdleCpus"] / total_available_vcpus * 100).to_list() - tss = (sch_info_csv["TimeStampSecs"] - sch_info_csv["TimeStampSecs"][0]).to_list() + idle_cpus = ( + sch_info_csv["NumIdleCpus"] / total_available_vcpus * 100 + ).to_list() + tss = ( + sch_info_csv["TimeStampSecs"] + - sch_info_csv["TimeStampSecs"][0] + ).to_list() # Idle vCPUs for (idle_cpu, ts) in zip(idle_cpus, tss): @@ -190,9 +208,7 @@ def _do_plot_makespan(results, ax, **kwargs): for ind, n_vms in enumerate(num_vms): x_offset = ind * len(baselines) + (ind + 1) xs += [x + x_offset for x in range(len(baselines))] - ys += [ - results[n_vms][baseline]["makespan"] for baseline in baselines - ] + ys += [results[n_vms][baseline]["makespan"] for baseline in baselines] colors += [ get_color_for_baseline("omp-elastic", baseline) for baseline in baselines @@ -200,9 +216,7 @@ def _do_plot_makespan(results, ax, **kwargs): # Add one tick and xlabel per VM size xticks.append(x_offset + len(baselines) / 2) - xticklabels.append( - "{} VMs\n({} Jobs)".format(n_vms, num_tasks[ind]) - ) + xticklabels.append("{} VMs\n({} Jobs)".format(n_vms, num_tasks[ind])) # Add spacing between vms if ind != len(num_vms) - 1: @@ -275,24 +289,31 @@ def _do_plot_percentage_vcpus(results, ax, **kwargs): for n_vms in num_vms: timestamps = list(results[n_vms][baseline]["ts_vcpus"].keys()) - total_cpusecs = (timestamps[-1] - timestamps[0]) * num_cpus_per_vm * int(n_vms) + total_cpusecs = ( + (timestamps[-1] - timestamps[0]) * num_cpus_per_vm * int(n_vms) + ) cumsum = cum_sum( timestamps, - [res * num_cpus_per_vm * int(n_vms) / 100 - for res in list(results[n_vms][baseline]["ts_vcpus"].values())], + [ + res * num_cpus_per_vm * int(n_vms) / 100 + for res in list( + results[n_vms][baseline]["ts_vcpus"].values() + ) + ], ) # Record both the total idle CPUsecs and the percentage - cumsum_ys[baseline][n_vms] = (cumsum, (cumsum / total_cpusecs) * 100) + cumsum_ys[baseline][n_vms] = ( + cumsum, + (cumsum / total_cpusecs) * 100, + ) xs = [ind for ind in range(len(num_vms))] xticklabels = [] for (n_vms, n_tasks) in zip(num_vms, num_tasks): - xticklabels.append( - "{} VMs\n({} Jobs)".format(n_vms, n_tasks) - ) + xticklabels.append("{} VMs\n({} Jobs)".format(n_vms, n_tasks)) for baseline in baselines: ys = [cumsum_ys[baseline][n_vms][1] for n_vms in num_vms] ax.plot( @@ -312,7 +333,9 @@ def _do_plot_percentage_vcpus(results, ax, **kwargs): def _do_plot_ts_vcpus(results, ax, **kwargs): assert "timeseries_num_vms" in kwargs, "timeseries_num_vms not in kwargs!" - assert "timeseries_num_tasks" in kwargs, "timeseries_num_tasks not in kwargs!" + assert ( + "timeseries_num_tasks" in kwargs + ), "timeseries_num_tasks not in kwargs!" num_vms = kwargs["timeseries_num_vms"] baselines = ["slurm", "batch", "granny-elastic"] diff --git a/tasks/util/eviction.py b/tasks/util/eviction.py index 46071e7..5ede920 100644 --- a/tasks/util/eviction.py +++ b/tasks/util/eviction.py @@ -22,8 +22,10 @@ def read_eviction_results(num_vms, num_users, num_tasks, num_cpus_per_vm): result_dict = {} num_tasks_per_user = int(num_tasks / num_users) - glob_str = "makespan_exec-task-info_*_{}vms_{}tpusr_mpi-evict_{}_{}.csv".format( - num_vms, num_tasks_per_user, num_tasks, num_cpus_per_vm + glob_str = ( + "makespan_exec-task-info_*_{}vms_{}tpusr_mpi-evict_{}_{}.csv".format( + num_vms, num_tasks_per_user, num_tasks, num_cpus_per_vm + ) ) for csv in glob(join(MAKESPAN_RESULTS_DIR, glob_str)): baseline = csv.split("_")[2] @@ -39,7 +41,8 @@ def read_eviction_results(num_vms, num_users, num_tasks, num_cpus_per_vm): time_elapsed_secs = int(end_ts - start_ts) result_dict[baseline]["makespan"] = time_elapsed_secs print( - "Num VMs: {} - Num Users: {} - Num Tasks: {} - Baseline: {} - Makespan: {}s".format( + "Num VMs: {} - Num Users: {} - Num Tasks: {} -" + " Baseline: {} - Makespan: {}s".format( num_vms, num_users, num_tasks, baseline, time_elapsed_secs ) ) @@ -67,11 +70,16 @@ def read_eviction_results(num_vms, num_users, num_tasks, num_cpus_per_vm): start_slot = int(row["StartTimeStamp"] - start_ts) end_slot = int(row["EndTimeStamp"] - start_ts) for ind in range(start_slot, end_slot): - if user_id not in result_dict[baseline]["tasks_per_user_per_ts"]: + if ( + user_id + not in result_dict[baseline]["tasks_per_user_per_ts"] + ): print("User {} not registered in results!".format(user_id)) raise RuntimeError("User not registered!") - result_dict[baseline]["tasks_per_user_per_ts"][ind][user_id] += 1 + result_dict[baseline]["tasks_per_user_per_ts"][ind][ + user_id + ] += 1 return result_dict @@ -103,7 +111,9 @@ def _do_plot_makespan(results, ax, **kwargs): # Add one tick and xlabel per VM size xticks.append(x_offset + len(labels) / 2) xticklabels.append( - "{} VMs\n({} Jobs - {} Users)".format(n_vms, num_tasks[ind], num_users[ind]) + "{} VMs\n({} Jobs - {} Users)".format( + n_vms, num_tasks[ind], num_users[ind] + ) ) # Add spacing between vms @@ -120,8 +130,9 @@ def _do_plot_makespan(results, ax, **kwargs): legend_entries = [ Patch( color=get_color_for_baseline("mpi-migrate", label), - label=get_label_for_baseline("mpi-migrate", label) - ) for label in labels + label=get_label_for_baseline("mpi-migrate", label), + ) + for label in labels ] ax.legend(handles=legend_entries, ncols=2, fontsize=8) @@ -142,13 +153,18 @@ def _do_plot_tasks_per_user(results, ax, **kwargs): baselines = ["slurm", "batch", "granny-migrate"] xs = {} for baseline in baselines: - xs[baseline] = list(results[num_vms][baseline]["tasks_per_user_per_ts"].keys()) + xs[baseline] = list( + results[num_vms][baseline]["tasks_per_user_per_ts"].keys() + ) # For each baseline, for each user id, plot the timeseries of active jobs num_points_spline = 500 for baseline in baselines: for user_id in range(num_users): - ys = [results[num_vms][baseline]["tasks_per_user_per_ts"][x][user_id] for x in xs[baseline]] + ys = [ + results[num_vms][baseline]["tasks_per_user_per_ts"][x][user_id] + for x in xs[baseline] + ] ax.plot( xs[baseline], ys, diff --git a/tasks/util/k8s.py b/tasks/util/k8s.py index 50db835..93f12bc 100644 --- a/tasks/util/k8s.py +++ b/tasks/util/k8s.py @@ -29,5 +29,9 @@ def wait_for_pods(namespace, label, num_expected=1, quiet=False): break if not quiet: - print("{} pods not ready, waiting ({}/{})".format(namespace, len(true_statuses), num_expected)) + print( + "{} pods not ready, waiting ({}/{})".format( + namespace, len(true_statuses), num_expected + ) + ) sleep(5) diff --git a/tasks/util/makespan.py b/tasks/util/makespan.py index 3177b47..59199a2 100644 --- a/tasks/util/makespan.py +++ b/tasks/util/makespan.py @@ -39,7 +39,12 @@ GRANNY_ELASTIC_BASELINES = ["granny-elastic"] GRANNY_FT_BASELINES = ["granny-ft"] GRANNY_MIGRATE_BASELINES = ["granny-migrate"] -GRANNY_BASELINES = ["granny"] + GRANNY_MIGRATE_BASELINES + GRANNY_FT_BASELINES + GRANNY_ELASTIC_BASELINES +GRANNY_BASELINES = ( + ["granny"] + + GRANNY_MIGRATE_BASELINES + + GRANNY_FT_BASELINES + + GRANNY_ELASTIC_BASELINES +) ALLOWED_BASELINES = NATIVE_BASELINES + GRANNY_BASELINES # Workload/Migration related constants @@ -55,7 +60,9 @@ def init_csv_file(baseline, num_vms, trace_str, num_tasks_per_user=None): csv_name_ic = "makespan_{}_{}_{}_{}".format( IDLE_CORES_FILE_PREFIX, baseline, - num_vms if num_tasks_per_user is None else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), + num_vms + if num_tasks_per_user is None + else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), get_trace_ending(trace_str), ) ic_file = join(MAKESPAN_RESULTS_DIR, csv_name_ic) @@ -66,7 +73,9 @@ def init_csv_file(baseline, num_vms, trace_str, num_tasks_per_user=None): csv_name = "makespan_{}_{}_{}_{}".format( EXEC_TASK_INFO_FILE_PREFIX, baseline, - num_vms if num_tasks_per_user is None else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), + num_vms + if num_tasks_per_user is None + else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), get_trace_ending(trace_str), ) csv_file = join(MAKESPAN_RESULTS_DIR, csv_name) @@ -80,7 +89,9 @@ def init_csv_file(baseline, num_vms, trace_str, num_tasks_per_user=None): csv_name = "makespan_{}_{}_{}_{}".format( SCHEDULING_INFO_FILE_PREFIX, baseline, - num_vms if num_tasks_per_user is None else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), + num_vms + if num_tasks_per_user is None + else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), get_trace_ending(trace_str), ) csv_file = join(MAKESPAN_RESULTS_DIR, csv_name) @@ -92,7 +103,9 @@ def init_csv_file(baseline, num_vms, trace_str, num_tasks_per_user=None): out_file.write(",".join(ip_to_vm) + "\n") else: with open(csv_file, "w") as out_file: - out_file.write("TimeStampSecs,NumIdleVms,NumIdleCpus,NumCrossVmLinks\n") + out_file.write( + "TimeStampSecs,NumIdleVms,NumIdleCpus,NumCrossVmLinks\n" + ) out_file.write # Makespan file @@ -102,7 +115,9 @@ def init_csv_file(baseline, num_vms, trace_str, num_tasks_per_user=None): csv_name = "makespan_{}_{}_{}_{}".format( MAKESPAN_FILE_PREFIX, baseline, - num_vms if num_tasks_per_user is None else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), + num_vms + if num_tasks_per_user is None + else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), get_trace_ending(trace_str), ) csv_file = join(MAKESPAN_RESULTS_DIR, csv_name) @@ -110,12 +125,16 @@ def init_csv_file(baseline, num_vms, trace_str, num_tasks_per_user=None): out_file.write("MakespanSecs\n") -def write_line_to_csv(baseline, exp_key, num_vms, num_tasks_per_user, trace_str, *args): +def write_line_to_csv( + baseline, exp_key, num_vms, num_tasks_per_user, trace_str, *args +): if exp_key == IDLE_CORES_FILE_PREFIX: csv_name = "makespan_{}_{}_{}_{}".format( IDLE_CORES_FILE_PREFIX, baseline, - num_vms if num_tasks_per_user is None else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), + num_vms + if num_tasks_per_user is None + else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), get_trace_ending(trace_str), ) makespan_file = join(MAKESPAN_RESULTS_DIR, csv_name) @@ -125,7 +144,9 @@ def write_line_to_csv(baseline, exp_key, num_vms, num_tasks_per_user, trace_str, csv_name = "makespan_{}_{}_{}_{}".format( EXEC_TASK_INFO_FILE_PREFIX, baseline, - num_vms if num_tasks_per_user is None else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), + num_vms + if num_tasks_per_user is None + else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), get_trace_ending(trace_str), ) makespan_file = join(MAKESPAN_RESULTS_DIR, csv_name) @@ -135,7 +156,9 @@ def write_line_to_csv(baseline, exp_key, num_vms, num_tasks_per_user, trace_str, csv_name = "makespan_{}_{}_{}_{}".format( SCHEDULING_INFO_FILE_PREFIX, baseline, - num_vms if num_tasks_per_user is None else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), + num_vms + if num_tasks_per_user is None + else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), get_trace_ending(trace_str), ) makespan_file = join(MAKESPAN_RESULTS_DIR, csv_name) @@ -152,7 +175,9 @@ def write_line_to_csv(baseline, exp_key, num_vms, num_tasks_per_user, trace_str, csv_name = "makespan_{}_{}_{}_{}".format( MAKESPAN_FILE_PREFIX, baseline, - num_vms if num_tasks_per_user is None else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), + num_vms + if num_tasks_per_user is None + else "{}vms_{}tpusr".format(num_vms, num_tasks_per_user), get_trace_ending(trace_str), ) makespan_file = join(MAKESPAN_RESULTS_DIR, csv_name) @@ -282,10 +307,12 @@ def get_idle_core_count_from_task_info( return num_idle_cores_per_time_step + # ---------------------------- # Plotting utilities # ---------------------------- + def read_makespan_results(num_vms, num_tasks, num_cpus_per_vm): workload = "mpi-migrate" @@ -371,7 +398,7 @@ def read_makespan_results(num_vms, num_tasks, num_cpus_per_vm): # ----- sched_info_csv = "makespan_sched-info_{}_{}_{}_{}_{}.csv".format( - baseline, num_vms, workload, num_tasks, num_cpus_per_vm + baseline, num_vms, workload, num_tasks, num_cpus_per_vm ) if baseline not in GRANNY_BASELINES: result_dict[baseline]["task_scheduling"] = {} @@ -379,7 +406,9 @@ def read_makespan_results(num_vms, num_tasks, num_cpus_per_vm): # We identify VMs by numbers, not IPs ip_to_vm = {} vm_to_id = {} - with open(join(MAKESPAN_RESULTS_DIR, sched_info_csv), "r") as sched_fd: + with open( + join(MAKESPAN_RESULTS_DIR, sched_info_csv), "r" + ) as sched_fd: # Process the file line by line, as each line will be different in # length for num, line in enumerate(sched_fd): @@ -466,23 +495,33 @@ def read_makespan_results(num_vms, num_tasks, num_cpus_per_vm): # In addition, for each task in flight, add the tasks's IPs # to the host set - for vm_id in result_dict[baseline]["task_scheduling"][str(int(t))]: + for vm_id in result_dict[baseline]["task_scheduling"][ + str(int(t)) + ]: result_dict[baseline]["ts_idle_vms"][ts].add(vm_id) # Third, express the results as percentages, and the number of # idle VMs as a number (not as a set) for ts in result_dict[baseline]["ts_vcpus"]: result_dict[baseline]["ts_vcpus"][ts] = ( - result_dict[baseline]["ts_vcpus"][ts] / total_available_vcpus + result_dict[baseline]["ts_vcpus"][ts] + / total_available_vcpus ) * 100 - result_dict[baseline]["ts_idle_vms"][ts] = num_vms - len(result_dict[baseline]["ts_idle_vms"][ts]) + result_dict[baseline]["ts_idle_vms"][ts] = num_vms - len( + result_dict[baseline]["ts_idle_vms"][ts] + ) else: # For Granny, the idle vCPUs results are directly available in # the file sch_info_csv = read_csv(join(MAKESPAN_RESULTS_DIR, sched_info_csv)) - idle_cpus = (sch_info_csv["NumIdleCpus"] / total_available_vcpus * 100).to_list() - tss = (sch_info_csv["TimeStampSecs"] - sch_info_csv["TimeStampSecs"][0]).to_list() + idle_cpus = ( + sch_info_csv["NumIdleCpus"] / total_available_vcpus * 100 + ).to_list() + tss = ( + sch_info_csv["TimeStampSecs"] + - sch_info_csv["TimeStampSecs"][0] + ).to_list() # Idle vCPUs for (idle_cpu, ts) in zip(idle_cpus, tss): @@ -518,7 +557,9 @@ def read_makespan_results(num_vms, num_tasks, num_cpus_per_vm): continue # Add the accumulated to the total tally - result_dict[baseline]["ts_xvm_links"][ts] += get_xvm_links_from_part(list(sched.values())) + result_dict[baseline]["ts_xvm_links"][ + ts + ] += get_xvm_links_from_part(list(sched.values())) elif baseline == "batch": # Batch baseline is optimal in terms of cross-vm links task_size = task_trace[int(t)].size @@ -679,7 +720,7 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): percentiles, ys, label=get_label_for_baseline("mpi-migrate", label), - color=get_color_for_baseline("mpi-migrate", label) + color=get_color_for_baseline("mpi-migrate", label), ) ax.set_xlabel("Job percentile [th]") ax.set_ylabel("Job completion time [s]") @@ -744,7 +785,9 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): x_offset = ind * len(labels) + (ind + 1) xs += [x + x_offset for x in range(len(labels))] ys += [results[n_vms][la]["makespan"] for la in labels] - colors += [get_color_for_baseline("mpi-migrate", la) for la in labels] + colors += [ + get_color_for_baseline("mpi-migrate", la) for la in labels + ] # Add one tick and xlabel per VM size xticks.append(x_offset + len(labels) / 2) @@ -766,8 +809,9 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): legend_entries = [ Patch( color=get_color_for_baseline("mpi-migrate", label), - label=get_label_for_baseline("mpi-migrate", label) - ) for label in labels + label=get_label_for_baseline("mpi-migrate", label), + ) + for label in labels ] ax.legend(handles=legend_entries, ncols=2, fontsize=8) @@ -857,10 +901,17 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): # spl_granny = CubicSpline(xs_granny, ys_granny) # new_xs_granny = linspace(0, max(xs_granny), num=num_points) - xs_granny_migrate = list(results[num_vms]["granny-migrate"]["ts_xvm_links"].keys()) - ys_granny_migrate = [results[num_vms]["granny-migrate"]["ts_xvm_links"][x] for x in xs_granny_migrate] + xs_granny_migrate = list( + results[num_vms]["granny-migrate"]["ts_xvm_links"].keys() + ) + ys_granny_migrate = [ + results[num_vms]["granny-migrate"]["ts_xvm_links"][x] + for x in xs_granny_migrate + ] spl_granny_migrate = CubicSpline(xs_granny_migrate, ys_granny_migrate) - new_xs_granny_migrate = linspace(0, max(xs_granny_migrate), num=num_points) + new_xs_granny_migrate = linspace( + 0, max(xs_granny_migrate), num=num_points + ) ax.plot( xs_slurm, @@ -889,11 +940,7 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): color=get_color_for_baseline("mpi-migrate", "granny-migrate"), ) - xlim = max( - xs_batch[-1], - xs_slurm[-1], - new_xs_granny_migrate[-1] - ) + xlim = max(xs_batch[-1], xs_slurm[-1], new_xs_granny_migrate[-1]) ax.set_xlim(left=0, right=xlim) ax.set_ylim(bottom=0) ax.set_xlabel("Time [s]") @@ -913,20 +960,34 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): # new_xs_granny = linspace(0, max(xs_granny), num=num_points) # TODO: FIXME: move from granny-migrate to granny-evict! - xs_granny_migrate = list(results[num_vms]["granny-migrate"]["ts_idle_vms"].keys()) - ys_granny_migrate = [(results[num_vms]["granny-migrate"]["ts_idle_vms"][x] / num_vms) * 100 for x in xs_granny_migrate] + xs_granny_migrate = list( + results[num_vms]["granny-migrate"]["ts_idle_vms"].keys() + ) + ys_granny_migrate = [ + (results[num_vms]["granny-migrate"]["ts_idle_vms"][x] / num_vms) + * 100 + for x in xs_granny_migrate + ] spl_granny_migrate = CubicSpline(xs_granny_migrate, ys_granny_migrate) - new_xs_granny_migrate = linspace(0, max(xs_granny_migrate), num=num_points) + new_xs_granny_migrate = linspace( + 0, max(xs_granny_migrate), num=num_points + ) ax.plot( xs_slurm, - [(results[num_vms]["slurm"]["ts_idle_vms"][x] / num_vms) * 100 for x in xs_slurm], + [ + (results[num_vms]["slurm"]["ts_idle_vms"][x] / num_vms) * 100 + for x in xs_slurm + ], label=get_label_for_baseline("mpi-migrate", "slurm"), color=get_color_for_baseline("mpi-migrate", "slurm"), ) ax.plot( xs_batch, - [(results[num_vms]["batch"]["ts_idle_vms"][x] / num_vms) * 100 for x in xs_batch], + [ + (results[num_vms]["batch"]["ts_idle_vms"][x] / num_vms) * 100 + for x in xs_batch + ], label=get_label_for_baseline("mpi-migrate", "batch"), color=get_color_for_baseline("mpi-migrate", "batch"), ) @@ -946,11 +1007,7 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): color=get_color_for_baseline("mpi-migrate", "granny-migrate"), ) - xlim = max( - xs_batch[-1], - xs_slurm[-1], - new_xs_granny_migrate[-1] - ) + xlim = max(xs_batch[-1], xs_slurm[-1], new_xs_granny_migrate[-1]) ax.set_xlim(left=0, right=xlim) ax.set_ylim(bottom=0, top=100) ax.set_xlabel("Time [s]") @@ -974,11 +1031,20 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): for n_vms in num_vms: timestamps = list(results[n_vms][la]["ts_vcpus"].keys()) - total_cpusecs = (timestamps[-1] - timestamps[0]) * num_cpus_per_vm * int(n_vms) + total_cpusecs = ( + (timestamps[-1] - timestamps[0]) + * num_cpus_per_vm + * int(n_vms) + ) cumsum = cum_sum( timestamps, - [res * num_cpus_per_vm * int(n_vms) / 100 for res in list(results[n_vms][la]["ts_vcpus"].values())], + [ + res * num_cpus_per_vm * int(n_vms) / 100 + for res in list( + results[n_vms][la]["ts_vcpus"].values() + ) + ], ) # Record both the total idle CPUsecs and the percentage @@ -988,9 +1054,7 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): xticklabels = [] for (n_vms, n_tasks) in zip(num_vms, num_tasks): - xticklabels.append( - "{} VMs\n({} Jobs)".format(n_vms, n_tasks) - ) + xticklabels.append("{} VMs\n({} Jobs)".format(n_vms, n_tasks)) for la in labels: ys = [cumsum_ys[la][n_vms][1] for n_vms in num_vms] ax.plot( @@ -1025,11 +1089,20 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): for n_vms in num_vms: timestamps = list(results[n_vms][la]["ts_xvm_links"].keys()) - total_cpusecs = (timestamps[-1] - timestamps[0]) * num_cpus_per_vm * int(n_vms) + total_cpusecs = ( + (timestamps[-1] - timestamps[0]) + * num_cpus_per_vm + * int(n_vms) + ) cumsum = cum_sum( timestamps, - [res for res in list(results[n_vms][la]["ts_xvm_links"].values())], + [ + res + for res in list( + results[n_vms][la]["ts_xvm_links"].values() + ) + ], ) cumsum_ys[la][n_vms] = cumsum @@ -1038,11 +1111,12 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): xs = [ind for ind in range(len(num_vms))] xticklabels = [] for (n_vms, n_tasks) in zip(num_vms, num_tasks): - xticklabels.append( - "{} VMs\n({} Jobs)".format(n_vms, n_tasks) - ) + xticklabels.append("{} VMs\n({} Jobs)".format(n_vms, n_tasks)) for la in labels: - ys = [cumsum_ys[la][n_vms] / cumsum_ys["batch"][n_vms] for n_vms in num_vms] + ys = [ + cumsum_ys[la][n_vms] / cumsum_ys["batch"][n_vms] + for n_vms in num_vms + ] ax.plot( xs, ys, @@ -1075,10 +1149,15 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): # Number of cross-VM links xs += [x + x_offset for x in range(len(labels))] - ys += [list(results[n_vms][la]["ts_xvm_links"].values()) for la in labels] + ys += [ + list(results[n_vms][la]["ts_xvm_links"].values()) + for la in labels + ] # Color and alpha for each box - colors += [get_color_for_baseline("mpi-migrate", la) for la in labels] + colors += [ + get_color_for_baseline("mpi-migrate", la) for la in labels + ] # Add one tick and xlabel per VM size xticks.append(x_offset + len(labels) / 2) @@ -1095,7 +1174,7 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): widths=0.5, ) - for (box, color) in zip(bplot['boxes'], colors): + for (box, color) in zip(bplot["boxes"], colors): box.set_facecolor(color) box.set_edgecolor("black") @@ -1107,8 +1186,9 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): legend_entries = [ Patch( color=get_color_for_baseline("mpi-migrate", label), - label=get_label_for_baseline("mpi-migrate", label) - ) for label in labels + label=get_label_for_baseline("mpi-migrate", label), + ) + for label in labels ] ax.legend(handles=legend_entries, ncols=2, fontsize=8) elif plot_name == "used_vmsecs": @@ -1128,15 +1208,29 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): for n_vms in num_vms: timestamps = list(results[n_vms][la]["ts_idle_vms"].keys()) - # total_cpusecs = (timestamps[-1] - timestamps[0]) * num_cpus_per_vm * int(n_vms) cumsum = cum_sum( timestamps, - [(n_vms - res) for res in list(results[n_vms][la]["ts_idle_vms"].values())], + [ + (n_vms - res) + for res in list( + results[n_vms][la]["ts_idle_vms"].values() + ) + ], ) # TODO: delete me if n_vms == 16: - print(n_vms, la, cumsum, [res for res in list(results[n_vms][la]["ts_idle_vms"].values())]) + print( + n_vms, + la, + cumsum, + [ + res + for res in list( + results[n_vms][la]["ts_idle_vms"].values() + ) + ], + ) cumsum_ys[la][n_vms] = cumsum @@ -1144,9 +1238,7 @@ def do_makespan_plot(plot_name, results, ax, num_vms, num_tasks): xs = [ind for ind in range(len(num_vms))] xticklabels = [] for (n_vms, n_tasks) in zip(num_vms, num_tasks): - xticklabels.append( - "{} VMs\n({} Jobs)".format(n_vms, n_tasks) - ) + xticklabels.append("{} VMs\n({} Jobs)".format(n_vms, n_tasks)) for la in labels: ys = [cumsum_ys[la][n_vms] for n_vms in num_vms] ax.plot( diff --git a/tasks/util/math.py b/tasks/util/math.py index 71cb3cb..237e809 100644 --- a/tasks/util/math.py +++ b/tasks/util/math.py @@ -3,7 +3,11 @@ def cum_sum(ts, values): Perform the cumulative sum of the values (i.e. integral) over the time interval defined by ts """ - assert len(ts) == len(values), "Can't CumSum over different sizes!({} != {})".format(len(ts), len(values)) + assert len(ts) == len( + values + ), "Can't CumSum over different sizes!({} != {})".format( + len(ts), len(values) + ) cum_sum = 0 prev_t = ts[0] diff --git a/tasks/util/openmpi.py b/tasks/util/openmpi.py index b04240e..cb1be14 100644 --- a/tasks/util/openmpi.py +++ b/tasks/util/openmpi.py @@ -95,8 +95,7 @@ def run_kubectl_cmd(experiment_name, cmd, capture_stderr=False): def get_native_mpi_pods(experiment_name): # List all pods cmd_out = run_kubectl_cmd( - experiment_name, - "get pods -o wide -l run=faasm-openmpi" + experiment_name, "get pods -o wide -l run=faasm-openmpi" ) # Split output into list of strings @@ -116,7 +115,9 @@ def get_native_mpi_pods(experiment_name): def restart_native_mpi_pod(experiment_name, pod_names): - run_kubectl_cmd(experiment_name, "delete pod {}".format(" ".join(pod_names))) + run_kubectl_cmd( + experiment_name, "delete pod {}".format(" ".join(pod_names)) + ) def get_native_mpi_pods_ip_to_vm(experiment_name): diff --git a/tasks/util/planner.py b/tasks/util/planner.py index 6977aa3..fa8e766 100644 --- a/tasks/util/planner.py +++ b/tasks/util/planner.py @@ -4,8 +4,11 @@ ) from time import sleep + # This method also returns the number of used VMs -def get_num_idle_cpus_from_in_flight_apps(num_vms, num_cpus_per_vm, in_flight_apps): +def get_num_idle_cpus_from_in_flight_apps( + num_vms, num_cpus_per_vm, in_flight_apps +): total_cpus = int(num_vms) * int(num_cpus_per_vm) worker_occupation = {} @@ -25,11 +28,11 @@ def get_num_idle_cpus_from_in_flight_apps(num_vms, num_cpus_per_vm, in_flight_ap def get_num_available_slots_from_in_flight_apps( - num_vms, - num_cpus_per_vm, - user_id=None, - num_evicted_vms=None, - openmp=False, + num_vms, + num_cpus_per_vm, + user_id=None, + num_evicted_vms=None, + openmp=False, ): """ For Granny baselines, we cannot use static knowledge of the @@ -44,14 +47,20 @@ def get_num_available_slots_from_in_flight_apps( available_ips = [host.ip for host in available_hosts.hosts] if len(available_ips) != num_vms: - print("Not enough hosts registered ({}/{}). Retrying...".format( - len(available_ips), - num_vms) + print( + "Not enough hosts registered ({}/{}). Retrying...".format( + len(available_ips), num_vms + ) ) sleep(short_sleep_secs) continue - available_slots = sum([int(host.slots - host.usedSlots) for host in available_hosts.hosts]) + available_slots = sum( + [ + int(host.slots - host.usedSlots) + for host in available_hosts.hosts + ] + ) next_evicted_vm_ips = [] try: @@ -59,7 +68,10 @@ def get_num_available_slots_from_in_flight_apps( except AttributeError: pass - if num_evicted_vms is not None and len(next_evicted_vm_ips) != num_evicted_vms: + if ( + num_evicted_vms is not None + and len(next_evicted_vm_ips) != num_evicted_vms + ): print("Not enough evicted VMs registered. Retrying...") sleep(short_sleep_secs) continue @@ -84,7 +96,11 @@ def get_num_available_slots_from_in_flight_apps( must_hold_back = False for app in in_flight_apps.apps: if any([ip in next_evicted_vm_ips for ip in app.hostIps]): - print("Detected app {} scheduled in to-be evicted VM. Retrying...".format(app.appId)) + print( + "Detected app {} scheduled in to-be evicted VM. Retrying...".format( + app.appId + ) + ) must_hold_back = True break @@ -123,16 +139,27 @@ def get_num_available_slots_from_in_flight_apps( if num_vms > len(list(worker_occupation.keys())): return num_cpus_per_vm - return max([num_cpus_per_vm - worker_occupation[ip] for ip in worker_occupation]) + return max( + [ + num_cpus_per_vm - worker_occupation[ip] + for ip in worker_occupation + ] + ) - num_available_slots = (num_vms - len(list(worker_occupation.keys()))) * num_cpus_per_vm + num_available_slots = ( + num_vms - len(list(worker_occupation.keys())) + ) * num_cpus_per_vm for ip in worker_occupation: num_available_slots += num_cpus_per_vm - worker_occupation[ip] # Double-check the number of available slots with our other source of truth if user_id is not None and num_available_slots != available_slots: print( - "WARNING: inconsistency in the number of available slots (in flight: {} - registered: {})".format(num_available_slots, available_slots)) + "WARNING: inconsistency in the number of available slots" + " (in flight: {} - registered: {})".format( + num_available_slots, available_slots + ) + ) sleep(short_sleep_secs) continue @@ -142,7 +169,11 @@ def get_num_available_slots_from_in_flight_apps( # If we have any frozen apps, we want to un-FREEZE them to prevent building # up a buffer in the planner if len(in_flight_apps.frozenApps) > 0: - print("Detected frozen apps, so returning 0 slots: {}".format(in_flight_apps.frozenApps)) + print( + "Detected frozen apps, so returning 0 slots: {}".format( + in_flight_apps.frozenApps + ) + ) return 0 return num_available_slots @@ -160,7 +191,7 @@ def get_xvm_links_from_part(part): count = 0 for ind in range(len(part)): - count += sum(part[0:ind] + part[ind + 1:]) * part[ind] + count += sum(part[0:ind] + part[ind + 1 :]) * part[ind] return int(count / 2) @@ -177,11 +208,11 @@ def get_num_xvm_links_from_in_flight_apps(in_flight_apps): part = list(app_ocupation.values()) # TODO: delete me -# print("DEBUG - App: {} - Occupation: {} - Part: {} - Links: {}".format( -# app.appId, -# app_ocupation, -# part, -# get_xvm_links_from_part(part))) + # print("DEBUG - App: {} - Occupation: {} - Part: {} - Links: {}".format( + # app.appId, + # app_ocupation, + # part, + # get_xvm_links_from_part(part))) total_xvm_links += get_xvm_links_from_part(part) return total_xvm_links diff --git a/tasks/util/plot.py b/tasks/util/plot.py index 63080bd..f8cf8f0 100644 --- a/tasks/util/plot.py +++ b/tasks/util/plot.py @@ -46,7 +46,11 @@ def _do_get_for_baseline(workload, baseline, color=False, label=False): if color: return _PLOT_COLORS[this_label] - raise RuntimeError("Unrecognised baseline ({}) for workload: {}".format(baseline, workload)) + raise RuntimeError( + "Unrecognised baseline ({}) for workload: {}".format( + baseline, workload + ) + ) if workload == "mpi-migrate": if baseline == "granny": @@ -68,7 +72,11 @@ def _do_get_for_baseline(workload, baseline, color=False, label=False): if color: return _PLOT_COLORS[this_label] - raise RuntimeError("Unrecognised baseline ({}) for workload: {}".format(baseline, workload)) + raise RuntimeError( + "Unrecognised baseline ({}) for workload: {}".format( + baseline, workload + ) + ) if workload == "mpi-spot": if baseline == "granny": @@ -84,7 +92,11 @@ def _do_get_for_baseline(workload, baseline, color=False, label=False): if color: return _PLOT_COLORS[this_label] - raise RuntimeError("Unrecognised baseline ({}) for workload: {}".format(baseline, workload)) + raise RuntimeError( + "Unrecognised baseline ({}) for workload: {}".format( + baseline, workload + ) + ) def get_color_for_baseline(workload, baseline): @@ -94,6 +106,7 @@ def get_color_for_baseline(workload, baseline): def get_label_for_baseline(workload, baseline): return _do_get_for_baseline(workload, baseline, label=True) + def save_plot(fig, plot_dir, plot_name): fig.tight_layout() versioned_dir = join(PLOTS_ROOT, get_faasm_version()) diff --git a/tasks/util/spot.py b/tasks/util/spot.py index b4cd6c8..7700867 100644 --- a/tasks/util/spot.py +++ b/tasks/util/spot.py @@ -28,7 +28,9 @@ def read_spot_results(num_vms, num_tasks, num_cpus_per_vm): # ----- makespan_s = results["MakespanSecs"].to_list() - assert len(makespan_s) == 1, "Too many rows: expected 1, got {}!".format(len(makespan_s)) + assert ( + len(makespan_s) == 1 + ), "Too many rows: expected 1, got {}!".format(len(makespan_s)) makespan_s = makespan_s[0] result_dict[baseline]["makespan"] = makespan_s @@ -64,7 +66,8 @@ def _do_plot_makespan(results, ax, **kwargs): x_offset = ind * len(baselines) + (ind + 1) xs += [x + x_offset for x in range(len(baselines))] ys += [ - float(results[n_vms][baseline + "-ft"]["makespan"]) / float(results[n_vms][baseline]["makespan"]) + float(results[n_vms][baseline + "-ft"]["makespan"]) + / float(results[n_vms][baseline]["makespan"]) for baseline in baselines ] colors += [ @@ -74,9 +77,7 @@ def _do_plot_makespan(results, ax, **kwargs): # Add one tick and xlabel per VM size xticks.append(x_offset + len(baselines) / 2) - xticklabels.append( - "{} VMs\n({} Jobs)".format(n_vms, num_tasks[ind]) - ) + xticklabels.append("{} VMs\n({} Jobs)".format(n_vms, num_tasks[ind])) # Add spacing between vms if ind != len(num_vms) - 1: @@ -88,7 +89,7 @@ def _do_plot_makespan(results, ax, **kwargs): ax.set_ylim(bottom=0) if tight: ax.set_ylabel("Slowdown [Spot / No Spot]", fontsize=6) - ax.tick_params(axis='y', labelsize=6) + ax.tick_params(axis="y", labelsize=6) else: ax.set_ylabel("Makespan Slowdown \n [Spot VMs / No Spot VMs]") @@ -96,10 +97,13 @@ def _do_plot_makespan(results, ax, **kwargs): legend_entries = [ Patch( color=get_color_for_baseline("mpi-spot", baseline), - label=get_label_for_baseline("mpi-spot", baseline) - ) for baseline in baselines + label=get_label_for_baseline("mpi-spot", baseline), + ) + for baseline in baselines ] - ax.legend(handles=legend_entries, ncols=1, fontsize=6, loc="lower center") + ax.legend( + handles=legend_entries, ncols=1, fontsize=6, loc="lower center" + ) if tight: ax.set_xticks([]) @@ -142,20 +146,29 @@ def _do_plot_cost(results, ax, **kwargs): for discount in discounts_pcnt: ys[discount] += [ - (float(results[n_vms][baseline + "-ft"]["makespan"]) * (1 - discount / 100) / 3600) * n_vms + ( + float(results[n_vms][baseline + "-ft"]["makespan"]) + * (1 - discount / 100) + / 3600 + ) + * n_vms for baseline in baselines ] if ind != len(num_vms) - 1: ys[discount].append(0) - nospot_ys += [(results[n_vms][baseline]["makespan"] / 3600) * n_vms for baseline in baselines] - colors += [get_color_for_baseline("mpi-spot", baseline) for baseline in baselines] + nospot_ys += [ + (results[n_vms][baseline]["makespan"] / 3600) * n_vms + for baseline in baselines + ] + colors += [ + get_color_for_baseline("mpi-spot", baseline) + for baseline in baselines + ] # Add one tick and xlabel per VM size xticks.append(x_offset + len(baselines) / 2) - xticklabels.append( - "{} VMs\n({} Jobs)".format(n_vms, num_tasks[ind]) - ) + xticklabels.append("{} VMs\n({} Jobs)".format(n_vms, num_tasks[ind])) # Add spacing between vms if ind != len(num_vms) - 1: @@ -167,7 +180,9 @@ def _do_plot_cost(results, ax, **kwargs): if ind == 0: ax.bar(xs, ys[discount], color=colors, edgecolor="black", width=1) else: - this_ys = [y - bottom_y for y, bottom_y in zip(ys[discount], bottom_ys)] + this_ys = [ + y - bottom_y for y, bottom_y in zip(ys[discount], bottom_ys) + ] ax.bar( xs, this_ys, @@ -175,7 +190,7 @@ def _do_plot_cost(results, ax, **kwargs): color=colors, edgecolor="black", alpha=float(discount / 100.0), - width=1 + width=1, ) # Add disccount annotation @@ -210,7 +225,7 @@ def _do_plot_cost(results, ax, **kwargs): ax.set_ylim(bottom=0) if tight: ax.set_ylabel("Cost [VM Hours]", fontsize=6) - ax.tick_params(axis='y', labelsize=6) + ax.tick_params(axis="y", labelsize=6) ax.legend(fontsize=6) ax.set_xticks([]) else: diff --git a/tasks/util/trace.py b/tasks/util/trace.py index dc8d78e..1cfa672 100644 --- a/tasks/util/trace.py +++ b/tasks/util/trace.py @@ -5,6 +5,7 @@ MAKESPAN_TRACES_DIR = join(PROJ_ROOT, "tasks", "makespan", "traces") + # Copy this from tasks.makespan.data to prevent a circular import @dataclass class TaskObject: