From 8e163d745736a69a468ccde937d1cd0a3db5de35 Mon Sep 17 00:00:00 2001 From: Kenneth Cain Date: Tue, 17 Dec 2024 11:31:33 -0500 Subject: [PATCH 1/8] DAOS-16111 test: new utility daos_sys_logscan Scan a list of engine logfiles to produce a nested dictionary of pools and a sequence of key events such as pool leadership terms, pool map version updates (due to target state changes), rebuild start and progress update events and total rebuild duration. This first version focuses on finding the pool service leader engine log file and producing this information. Future updates to the tool can potentially include finer-grain tracking of operations across all pool storage engine log files. The supporting class LogLine in cart_logparse.py has a tiny change to support this new utility. Signed-off-by: Kenneth Cain --- src/tests/ftest/cart/util/cart_logparse.py | 6 +- src/tests/ftest/cart/util/daos_sys_logscan.py | 293 ++++++++++++++++++ 2 files changed, 296 insertions(+), 3 deletions(-) create mode 100644 src/tests/ftest/cart/util/daos_sys_logscan.py diff --git a/src/tests/ftest/cart/util/cart_logparse.py b/src/tests/ftest/cart/util/cart_logparse.py index 42d6893f01b..0cec97e0eaa 100644 --- a/src/tests/ftest/cart/util/cart_logparse.py +++ b/src/tests/ftest/cart/util/cart_logparse.py @@ -1,5 +1,5 @@ # /* -# * (C) Copyright 2016-2023 Intel Corporation. +# * (C) Copyright 2016-2024 Intel Corporation. # * # * SPDX-License-Identifier: BSD-2-Clause-Patent # */ @@ -104,7 +104,8 @@ def __init__(self, line): except KeyError as error: raise InvalidLogFile(fields[4]) from error - # self.time_stamp = fields[0] + self.time_stamp = fields[0] + self.hostname = fields[1] self._fields = fields[5:] try: if self._fields[1][-2:] == '()': @@ -369,7 +370,6 @@ def free_pointer(self): """Return the memory address freed""" return self.get_field(-1).rstrip('.') - class StateIter(): """Helper class for LogIter to add a state-full iterator. diff --git a/src/tests/ftest/cart/util/daos_sys_logscan.py b/src/tests/ftest/cart/util/daos_sys_logscan.py new file mode 100644 index 00000000000..cff39d50f7b --- /dev/null +++ b/src/tests/ftest/cart/util/daos_sys_logscan.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python3 +# +# (C) Copyright 2024 Intel Corporation +# +# SPDX-License-Identifier: BSD-2-Clause-Patent + +"""Scan daos_engine log files to get a summary of pools activity.""" + +import argparse +import re +import sys +import time +import pprint +from collections import Counter, OrderedDict, defaultdict + +import cart_logparse + +class SysPools(): + """Directory of Pools and Summary Activities Found in Engine Log Files""" + + # TODO + # diagram of nested dictionaries constructed + # system map update events (output outside if pool-specific context) + # SWIM events seen by PS leader? + # add/remove target events on PS leader? + # pool map version changes + # rebuild queued (PS leader) + # rebuild scanning (PS leader, and count of # engines completing scanning, time duration, stalls) + # rebuild pulling (PS leader? count of # of engines starting/finishing pulling, may be a subset, time duration, stalls) + # rebuild number of objects, records and progress made? + # rebuild aborted/errored + # rebuild completed/success + + re_rank_assign = re.compile("ds_mgmt_drpc_set_rank.*set rank to (\d+)") + re_step_up = re.compile("rdb_raft_step_up.*([0-9a-fA-F]{8}).*became leader of term (\d+)") + re_step_down = re.compile("rdb_raft_step_down.*([0-9a-fA-F]{8}).*no longer leader of term (\d+)") + # TODO: update_one_tgt(), update_tgt_down_drain_to_downout() "change Target.*rank (\d+) idx (\d+).*to (\w+)" + # need those functions to print pool UUID. Then here, store a list of target change events in + # tgt_change = {"rank": affected_rank, "tgt_idx": affected_tgtidx, "state": new_tgtstate} + # self._pools[puuid][highest_term[puuid]]["maps"][highest_pmapver[puuid]]["tgt_state_changes"].append(tgt_change) + re_pmap_update = re.compile("ds_pool_tgt_map_update\(\) ([0-9a-fA-F]{8}): updated pool map: version=(\d+)->(\d+) pointer") + #TODO/FIXME rebuild_task_ult() waiting for scheduling time, and waiting for pool map dist needs uniform rb= string. + #re.rebuild_ldr_wait_schedtime = re.compile("rebuild_task_ult\(\).*rebuild task sleep (\d+) second") + #re.rebuild_ldr_wait_pmapdist = re.compile("rebuild_task_ult\(\).*map_dist_ver (\d+) map ver (\d+)") + # uniform rebuild string identifier rb=/// + re_rebuild_ldr_start= re.compile("rebuild_leader_start.*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+)$") + re_rebuild_ldr_status = re.compile("rebuild_leader_status_check\(\).*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+) \[(\w+)\].*duration=(\d+)") + re_rebuild_ldr_scanning = re.compile("rebuild_leader_status_check\(\).*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+) \[scanning\].*duration=(\d+) secs") + re_rebuild_ldr_pulling= re.compile("rebuild_leader_status_check\(\).*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+) \[pulling\].*duration=(\d+) secs") + re_rebuild_ldr_completed= re.compile("rebuild_leader_status_check\(\).*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+) \[completed\].*duration=(\d+) secs") + re_rebuild_ldr_aborted= re.compile("rebuild_leader_status_check\(\).*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+) \[aborted\].*duration=(\d+) secs") + + def __init__(self): + # dictionaries indexed by pool UUID + self._pools = {} + self._highest_pmapver = {} + self._cur_ldr_rank = {} + self._cur_term = {} + + def check_file(self, log_iter): + fname = log_iter.fname + for pid in log_iter.get_pids(): + rank = -1 + for line in log_iter.new_iter(pid=pid): + msg = line.get_msg() + host = line.hostname + datetime = line.time_stamp + # Find engine rank assignment (early in log) + match = self.re_rank_assign.match(msg) + if match: + rank = int(match.group(1)) + print(f"========== rank {rank} logfile {fname} ==========") + continue + + # Find pool term begin (PS leader step_up) + match = self.re_step_up.match(msg) + if match: + puuid = match.group(1) + term = int(match.group(2)) + if puuid not in self._pools: + self._pools[puuid] = {} + self._cur_ldr_rank[puuid]= rank + self._cur_term[puuid] = term + old_term = term - 1 + # if term already exists, error? + if term in self._pools[puuid]: + print(f"WARN: pool {puuid} term {term} already seen!") + # carry over the most recent map version change into the new term, to avoid possible KeyError in rebuild leader status match? + if old_term in self._pools: + #print(f"PROCESS FOUND pool {puuid} BEGIN term {term}: prior term {old_term} exists") + #if self._pools[old_term]["maps"] is not None: + #print(f"PROCESS FOUND pool {puuid} BEGIN term {term}: maps dictionary exists.") + #if (self._pools[old_term]["maps"]) != {}: + #print(f'PROCESS FOUND pool {puuid} BEGIN term {term}: maps dictionary is non-empty, with {len(list(self._pools[puuid][old_term]["maps"].keys()))} keys') + if self._pools and self._pools[puuid][old_term]["maps"] != {}: + last_mapver = list(self._pools[puuid][old_term]["maps"].keys())[-1] + pmap_versions = self._pools[puuid][old_term]["maps"][last_mapver] + pmap_versions["carryover"] = True + #pmap_versions["rebuild_gens"] = {} + else: + pmap_versions = {} + self._pools[puuid][term] = {"rank": rank, "begin_time": datetime, "end_time": "", "host": host, "pid": pid, "logfile": fname, "maps": pmap_versions} + #print(f"{datetime} FOUND pool {puuid} BEGIN\tterm {term} pmap_versions empty: {str(pmap_versions == {})} rank {rank}\t{host}\tPID {pid}\t{fname}") + continue + + # Find pool term end (PS leader step_down) + match = self.re_step_down.match(msg) + if match: + puuid = match.group(1) + term = int(match.group(2)) + if (term != self._cur_term[puuid]): + print(f"WARN: step_down term={term} != cur_term={self._cur_term}") + self._cur_ldr_rank[puuid] = -1 + self._cur_term[puuid] = -1 + self._pools[puuid][term]["end_time"] = datetime + #print(f"{datetime} FOUND pool {puuid} END\tterm {term} rank {rank}\t{host}\tPID {pid}\t{fname}") + continue + + # TODO: find target status updates (precursor to pool map updates) + + # Find pool map updates + # FIXME: but only on the current PS leader engine / term? And carry over latest pmap version from prior term to start with? + match = self.re_pmap_update.match(msg) + if match: + puuid = match.group(1) + from_ver = int(match.group(2)) + to_ver = int(match.group(3)) + # ignore if this engine is not the leader + if puuid not in self._pools or rank != self._cur_ldr_rank[puuid]: + continue + term = self._cur_term[puuid] + self._pools[puuid][term]["maps"][to_ver] = {"carryover": False, "from_ver": from_ver, "time": datetime, "rebuild_gens": {}} + #print(f"FOUND pool {puuid} map update {from_ver}->{to_ver} rank {rank}\t{host}\tPID {pid}\t{fname}") + continue + + # Find rebuild start by the PS leader + match = self.re_rebuild_ldr_start.match(msg) + if match: + puuid = match.group(1) + mapver = int(match.group(2)) + rebuild_gen = int(match.group(3)) + rebuild_op = match.group(4) + if rank != self._cur_ldr_rank[puuid]: + continue + term = self._cur_term[puuid] + if term < 1: + print(f"WARN pool {puuid} I don't know what term it is ({term})!") + # TODO: for now assuming rebuild_gen isn't in the dictionary yet. should we test to be safe? + if rebuild_gen in self._pools[puuid][term]["maps"][mapver]["rebuild_gens"]: + print(f"WARN pool {puuid} term {term} mapver {mapver} already has rebuild_gen {rebuild_gen}!") + # TODO: keep timestamps for overall/scan start, pull start, completed, convert to float and store component durations too + self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen] = {"op": rebuild_op, "start_time": datetime, "time": "xx/xx-xx:xx:xx.xx", "started": True, "scanning": False, "pulling": False, "completed": False, "aborted": False, "duration": 0} + #print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{mapver}/{rebuild_gen}/{rebuild_op} rank {rank}\t{host}\tPID {pid}\t{fname}") + continue + + # Find rebuild status update reported by the PS leader + match = self.re_rebuild_ldr_status.match(msg) + if match: + puuid = match.group(1) + mapver = int(match.group(2)) + rebuild_gen = int(match.group(3)) + rebuild_op = match.group(4) + status = match.group(5) + dur = int(match.group(6)) + if rank != self._cur_ldr_rank[puuid]: + continue + term = self._cur_term[puuid] + if term < 1: + print(f"WARN pool {puuid} I don't know what term it is ({term})!") + if mapver not in self._pools[puuid][term]["maps"]: + print(f"WARN pool {puuid} term {term} mapver {mapver} is not in maps dictionary - creating placeholder") + self._pools[puuid][term]["maps"][mapver] = {"carryover": False, "from_ver": mapver, "time": "xx/xx-xx:xx:xx.xx", "rebuild_gens": {}} + self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen] = {"op": rebuild_op, "start_time": "xx/xx-xx:xx:xx.xx", "time": "xx/xx-xx:xx:xx.xx", "started": True, "scanning": False, "pulling": False, "completed": False, "aborted": False, "duration": 0} + + # TODO: verify rebuild_op == self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["op"]? + if rebuild_gen in self._pools[puuid][term]["maps"][mapver]["rebuild_gens"]: + existing_op = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["op"] + if rebuild_op != existing_op: + print(f"WARN rb={puuid}/{mapver}/{rebuild_gen}/{existing_op} != this line's op {rebuild_op}") + if (status == "scanning"): + #print(f'ASSIGN _pools[{puuid}][{term}]["maps"][{mapver}]["rebuild_gens"][{rebuild_gen}]["scanning"] = True : rank {rank}\t{host}\tPID {pid}\t{fname}') + self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["scanning"] = True + elif (status == "pulling"): + #print(f'ASSIGN _pools[{puuid}][{term}]["maps"][{mapver}]["rebuild_gens"][{rebuild_gen}]["pulling"] = True : rank {rank}\t{host}\tPID {pid}\t{fname}') + self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["pulling"] = True + elif (status == "completed"): + #print(f'ASSIGN _pools[{puuid}][{term}]["maps"][{mapver}]["rebuild_gens"][{rebuild_gen}]["completed"] = True : rank {rank}\t{host}\tPID {pid}\t{fname}') + self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["completed"] = True + elif (status == "aborted"): + self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["aborted"] = True + + self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["time"] = datetime + self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["duration"] = dur + #print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{mapver}/{rebuild_gen}/{rebuild_op} STATUS={status}, DURATION={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}") + continue + + # TODO: look for scan/migrate activity on all pool engines, count and correlate to PS leader activity? + + def print_pools(self): + for puuid in self._pools: + print(f"========== Pool {puuid}:") + for term in self._pools[puuid]: + b = self._pools[puuid][term]["begin_time"] + e = self._pools[puuid][term]["end_time"] + r = self._pools[puuid][term]["rank"] + h = self._pools[puuid][term]["host"] + p = self._pools[puuid][term]["pid"] + f = self._pools[puuid][term]["logfile"] + # Print term begin + print(f"{b} {puuid} BEGIN term {term}\trank {r}\t{h}\tPID {p}\t{f}") + + # Print pool map updates that happened within the term + for mapver in self._pools[puuid][term]["maps"]: + # Print rebuilds + + # TODO: print tgt state changes + #for tgt_change in self._pools[puuid][term]["maps"][mapver]["tgt_state_changes"]: + # print(f"TGT state {tgt_change["state"]}, rank: {tgt_change["rank"]}, idx: {tgt_change["tgt_idx"]}") + t = self._pools[puuid][term]["maps"][mapver]["time"] + from_ver = self._pools[puuid][term]["maps"][mapver]["from_ver"] + print(f"{t} {puuid} MAPVER {from_ver}->{mapver}") + + # Rebuilds + for rebuild_gen in self._pools[puuid][term]["maps"][mapver]["rebuild_gens"]: + rebuild_op = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["op"] + dur = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["duration"] + started = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["started"] + # TODO: scan_done, pull_done booleans + scanning = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["scanning"] + pulling = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["pulling"] + completed =self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["completed"] + aborted = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["aborted"] + st = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["start_time"] + ut = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["time"] + status = "started" + if aborted: + status = "aborted" + if completed: + status = "completed" + elif pulling: + status = "pulling" + elif scanning: + status = "scanning" + print(f"{st} {puuid} RBSTRT {mapver}/{rebuild_gen}/{rebuild_op}") + updated = scanning or pulling or completed or aborted + if updated: + print(f"{ut} {puuid} RBUPDT {mapver}/{rebuild_gen}/{rebuild_op} {status} {dur} seconds") + + # Print term end (if there is a PS leader step_down) + if e != "": + print(f"{e} {puuid} END term {term}\trank {r}\t{h}\tPID {p}\t{f}") + else: + print(" " * 18 + f"{puuid} END term {term}\trank {r}\t{h}\tPID {p}\t{f}") + + def sort(self): + for puuid in self._pools: + tmp = dict(sorted(self._pools[puuid].items())) + self._pools[puuid] = tmp + # _pools[puuid][term]["maps"] should have been inserted in ascending order already? + +def run(): + """Scan a list of daos_engine logfiles""" + ap = argparse.ArgumentParser() + ap.add_argument('filelist', nargs='+') + args = ap.parse_args() + + out_fname = 'sys_logscan.txt' + out_fd = open(out_fname, 'w') # pylint: disable=consider-using-with + real_stdout = sys.stdout + sys.stdout = out_fd + print(f'Logging to {out_fname}', file=real_stdout) + + sp = SysPools() + + for fname in args.filelist: + if fname.endswith("cart_logtest"): + continue + #print(f"\n========== Engine log: {fname} ==========") + try: + log_iter = cart_logparse.LogIter(fname) + except UnicodeDecodeError: + log_iter = cart_logparse.LogIter(args.file, check_encoding=True) + + if log_iter.file_corrupt: + sys.exit(1) + #print(f"{len(log_iter.get_pids())} PIDs") + sp.check_file(log_iter) + #pprint.pprint(sp._pools) + sp.sort() + sp.print_pools() + +if __name__ == '__main__': + run() From c691881095e897c25519f0ba85e609e4a1e14e2a Mon Sep 17 00:00:00 2001 From: Kenneth Cain Date: Thu, 19 Dec 2024 22:36:04 -0500 Subject: [PATCH 2/8] first refactor, address review feedback Signed-off-by: Kenneth Cain --- src/tests/ftest/cart/util/cart_logparse.py | 1 + src/tests/ftest/cart/util/daos_sys_logscan.py | 473 +++++++++++------- 2 files changed, 301 insertions(+), 173 deletions(-) diff --git a/src/tests/ftest/cart/util/cart_logparse.py b/src/tests/ftest/cart/util/cart_logparse.py index 0cec97e0eaa..b09baec7961 100644 --- a/src/tests/ftest/cart/util/cart_logparse.py +++ b/src/tests/ftest/cart/util/cart_logparse.py @@ -370,6 +370,7 @@ def free_pointer(self): """Return the memory address freed""" return self.get_field(-1).rstrip('.') + class StateIter(): """Helper class for LogIter to add a state-full iterator. diff --git a/src/tests/ftest/cart/util/daos_sys_logscan.py b/src/tests/ftest/cart/util/daos_sys_logscan.py index cff39d50f7b..36c64b259ec 100644 --- a/src/tests/ftest/cart/util/daos_sys_logscan.py +++ b/src/tests/ftest/cart/util/daos_sys_logscan.py @@ -7,13 +7,13 @@ """Scan daos_engine log files to get a summary of pools activity.""" import argparse +import cart_logparse import re import sys -import time -import pprint -from collections import Counter, OrderedDict, defaultdict +#import time +#import pprint +#from collections import Counter, OrderedDict, defaultdict -import cart_logparse class SysPools(): """Directory of Pools and Summary Activities Found in Engine Log Files""" @@ -23,182 +23,300 @@ class SysPools(): # system map update events (output outside if pool-specific context) # SWIM events seen by PS leader? # add/remove target events on PS leader? - # pool map version changes # rebuild queued (PS leader) - # rebuild scanning (PS leader, and count of # engines completing scanning, time duration, stalls) - # rebuild pulling (PS leader? count of # of engines starting/finishing pulling, may be a subset, time duration, stalls) + # rebuild scanning (PS leader warn about engine updates, #engines finishing scanning, stalls) + # rebuild pulling (PS leader, #engines/may be a subset, starting/finishing pulling, stalls) # rebuild number of objects, records and progress made? - # rebuild aborted/errored - # rebuild completed/success - - re_rank_assign = re.compile("ds_mgmt_drpc_set_rank.*set rank to (\d+)") - re_step_up = re.compile("rdb_raft_step_up.*([0-9a-fA-F]{8}).*became leader of term (\d+)") - re_step_down = re.compile("rdb_raft_step_down.*([0-9a-fA-F]{8}).*no longer leader of term (\d+)") - # TODO: update_one_tgt(), update_tgt_down_drain_to_downout() "change Target.*rank (\d+) idx (\d+).*to (\w+)" - # need those functions to print pool UUID. Then here, store a list of target change events in - # tgt_change = {"rank": affected_rank, "tgt_idx": affected_tgtidx, "state": new_tgtstate} - # self._pools[puuid][highest_term[puuid]]["maps"][highest_pmapver[puuid]]["tgt_state_changes"].append(tgt_change) - re_pmap_update = re.compile("ds_pool_tgt_map_update\(\) ([0-9a-fA-F]{8}): updated pool map: version=(\d+)->(\d+) pointer") - #TODO/FIXME rebuild_task_ult() waiting for scheduling time, and waiting for pool map dist needs uniform rb= string. - #re.rebuild_ldr_wait_schedtime = re.compile("rebuild_task_ult\(\).*rebuild task sleep (\d+) second") - #re.rebuild_ldr_wait_pmapdist = re.compile("rebuild_task_ult\(\).*map_dist_ver (\d+) map ver (\d+)") + + # Engine rank assignment and pool service leader step_up/down events + re_rank_assign = re.compile(r"ds_mgmt_drpc_set_rank.*set rank to (\d+)") + re_step_up = re.compile(r"rdb_raft_step_up.*([0-9a-fA-F]{8}).*leader of term (\d+)") + re_step_down = re.compile(r"rdb_raft_step_down.*([0-9a-fA-F]{8}).*leader of term (\d+)") + + # TODO: target state change events + # update_one_tgt(), update_tgt_down_drain_to_downout() + # "change Target.*rank (\d+) idx (\d+).*to (\w+)" + # need those functions to print pool UUID. Then here, store a list of target change events in + # tgt_change = {"rank": affected_rank, "tgt_idx": affected_tgtidx, "state": new_tgtstate} + # self._pools[puuid][term]["maps"][mapver]["tgt_state_changes"].append(tgt_change) + + # pool map version update events + upd_re = r"ds_pool_tgt_map_update.*([0-9a-fA-F]{8}): updated.*map: version=(\d+)->(\d+) pointer" + re_pmap_update = re.compile(upd_re) + # uniform rebuild string identifier rb=/// - re_rebuild_ldr_start= re.compile("rebuild_leader_start.*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+)$") - re_rebuild_ldr_status = re.compile("rebuild_leader_status_check\(\).*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+) \[(\w+)\].*duration=(\d+)") - re_rebuild_ldr_scanning = re.compile("rebuild_leader_status_check\(\).*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+) \[scanning\].*duration=(\d+) secs") - re_rebuild_ldr_pulling= re.compile("rebuild_leader_status_check\(\).*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+) \[pulling\].*duration=(\d+) secs") - re_rebuild_ldr_completed= re.compile("rebuild_leader_status_check\(\).*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+) \[completed\].*duration=(\d+) secs") - re_rebuild_ldr_aborted= re.compile("rebuild_leader_status_check\(\).*rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+) \[aborted\].*duration=(\d+) secs") + rbid_re = r"rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+)" + + # Rebuild preliminary steps + # TODO/FIXME rebuild_task_ult() waiting for sched time, and map dist needs uniform rb= string. + #re.compile(r"rebuild_task_ult\(\).*rebuild task sleep (\d+) second") + #re.compile(r"rebuild_task_ult\(\).*map_dist_ver (\d+) map ver (\d+)") + + # Rebuild: PS leader engine starting and status checking a given operation + # statuses: "scanning", "pulling", "completed", "aborted" + ldr_start_re = "rebuild_leader_start.*" + rbid_re + "$" + ldr_status_re = r"rebuild_leader_status_check\(\).*" + rbid_re + r" \[(\w+)\].*duration=(\d+)" + re_rebuild_ldr_start = re.compile(ldr_start_re) + re_rebuild_ldr_status = re.compile(ldr_status_re) def __init__(self): # dictionaries indexed by pool UUID self._pools = {} self._highest_pmapver = {} self._cur_ldr_rank = {} + self._cur_ldr_pid = {} self._cur_term = {} - def check_file(self, log_iter): + # filename to rank map + self._file_to_rank = {} + + self._warnings = [] + + def _warn(self, wmsg, fname, line): + full_msg = f"WARN file={fname}, line={line.lineno}: " + wmsg + self._warnings.append(full_msg) + print(full_msg) + + def find_rank(self, log_iter): + print(f"INFO: searching for rank in file {log_iter.fname}") + found = False + for line in log_iter.new_iter(): + # when a rank assignment log line found (engine start) + match = self.re_rank_assign.match(line.get_msg()) + if match: + self._file_to_rank[log_iter.fname] = int(match.group(1)) + found = True + break + + # TODO: what about log rotation (not an engine start scenario)? + return found + + # return log-message, hostname, and date/timestamp components of the line + def get_line_components(self, line): + return line.get_msg(), line.hostname, line.time_stamp + + # is this rank, pid the leader of the pool with uuid puuid? + def is_leader(self, puuid, rank, pid): + if not puuid in self._pools: + return False + if self._cur_ldr_rank[puuid] == rank and self._cur_ldr_pid[puuid] == pid: + return True + return False + + def match_ps_step_up(self, fname, line, pid, rank): + msg, host, datetime = self.get_line_components(line) + match = self.re_step_up.match(msg) + if match: + puuid = match.group(1) + term = int(match.group(2)) + if puuid not in self._pools: + self._pools[puuid] = {} + self._cur_ldr_rank[puuid]= rank + self._cur_ldr_pid[puuid] = pid + self._cur_term[puuid] = term + old_term = term - 1 + # if term already exists, error? + if term in self._pools[puuid]: + self._warn(f"pool {puuid} term {term} already seen!", fname, line) + # carry over most recent map version into the new term, avoid later KeyError + if old_term in self._pools: + if self._pools and self._pools[puuid][old_term]["maps"] != {}: + last_mapver = list(self._pools[puuid][old_term]["maps"].keys())[-1] + pmap_versions = self._pools[puuid][old_term]["maps"][last_mapver] + pmap_versions["carryover"] = True + #pmap_versions["rb_gens"] = {} + else: + pmap_versions = {} + self._pools[puuid][term] = { + "rank": rank, + "begin_time": datetime, + "end_time": "", + "host": host, + "pid": pid, + "logfile": fname, + "maps": pmap_versions + } + # DEBUG + #print(f"{datetime} FOUND pool {puuid} BEGIN\tterm {term} pmap_versions empty: " + # f"{str(pmap_versions == {})} rank {rank}\t{host}\tPID {pid}\t{fname}") + return True + return False + + def match_ps_step_down(self, fname, line, pid, rank): + msg, host, datetime = self.get_line_components(line) + match = self.re_step_down.match(msg) + if match: + puuid = match.group(1) + term = int(match.group(2)) + if (term != self._cur_term[puuid]): + self._warn(f"step_down term={term} != cur_term={self._cur_term}", fname, line) + self._cur_ldr_rank[puuid] = -1 + self._cur_ldr_pid[puuid] = -1 + self._cur_term[puuid] = -1 + self._pools[puuid][term]["end_time"] = datetime + # DEBUG + #print(f"{datetime} FOUND pool {puuid} END\tterm {term} rank {rank}\t{host}\t" + # f"PID {pid}\t{fname}") + return True + return False + + def match_ps_pmap_update(self, fname, line, pid, rank): + msg, host, datetime = self.get_line_components(line) + match = self.re_pmap_update.match(msg) + if match: + puuid = match.group(1) + from_ver = int(match.group(2)) + to_ver = int(match.group(3)) + # ignore if this engine is not the leader + if not self.is_leader(puuid, rank, pid): + return True + term = self._cur_term[puuid] + self._pools[puuid][term]["maps"][to_ver] = { + "carryover": False, + "from_ver": from_ver, + "time": datetime, + "rb_gens": {} + } + # DEBUG + #print(f"FOUND pool {puuid} map update {from_ver}->{to_ver} rank {rank}\t{host}\t" + # f"PID {pid}\t{fname}") + return True + return False + + def get_rb_components(self, match): + return match.group(1), int(match.group(2)), int(match.group(3)), match.group(4) + + def match_ps_rb_start(self, fname, line, pid, rank): + msg, host, datetime = self.get_line_components(line) + match = self.re_rebuild_ldr_start.match(msg) + if match: + puuid, ver, gen, op = self.get_rb_components(match) + if not self.is_leader(puuid, rank, pid): + return True + term = self._cur_term[puuid] + if term < 1: + self._warn(f"pool {puuid} I don't know what term it is ({term})!", fname, line) + return True + if gen in self._pools[puuid][term]["maps"][ver]["rb_gens"]: + self._warn(f"pool {puuid} term {term} ver {ver} already has gen {gen}", fname, line) + # TODO: keep timestamps for overall/scan start, pull start, completed + # convert to float and store component durations too + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen] = { + "op": op, + "start_time": datetime, + "time": "xx/xx-xx:xx:xx.xx", + "started": True, + "scanning": False, + "pulling": False, + "completed": False, + "aborted": False, + "duration": 0 + } + # DEBUG + #print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{ver}/{gen}/{op} " + # f"rank {rank}\t{host}\tPID {pid}\t{fname}") + return True + return False + + def match_ps_rb_status(self, fname, line, pid, rank): + msg, host, datetime = self.get_line_components(line) + match = self.re_rebuild_ldr_status.match(msg) + if match: + puuid, ver, gen, op = self.get_rb_components(match) + status = match.group(5) + dur = int(match.group(6)) + if not self.is_leader(puuid, rank, pid): + return True + term = self._cur_term[puuid] + if term < 1: + self._warn(f"pool {puuid} I don't know what term it is ({term})!", fname, line) + return True + if ver not in self._pools[puuid][term]["maps"]: + self._warn(f"pool {puuid} term {term} ver {ver} not in maps - add placeholder", + fname, line) + self._pools[puuid][term]["maps"][ver] = { + "carryover": False, + "from_ver": ver, + "time": "xx/xx-xx:xx:xx.xx", + "rb_gens": {} + } + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen] = { + "op": op, + "start_time": "xx/xx-xx:xx:xx.xx", + "time": "xx/xx-xx:xx:xx.xx", + "started": True, + "scanning": False, + "pulling": False, + "completed": False, + "aborted": False, + "duration": 0 + } + if gen in self._pools[puuid][term]["maps"][ver]["rb_gens"]: + existing_op = self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["op"] + if op != existing_op: + self._warn(f"rb={puuid}/{ver}/{gen}/{existing_op} != line op {op}", fname, line) + if (status == "scanning"): + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["scanning"] = True + elif (status == "pulling"): + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["pulling"] = True + elif (status == "completed"): + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["completed"] = True + elif (status == "aborted"): + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["aborted"] = True + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["time"] = datetime + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["duration"] = dur + # DEBUG + #print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{ver}/{gen}/{op} " + # f"STATUS={status}, DUR={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}") + return True + return False + + def scan_file(self, log_iter, rank=-1): fname = log_iter.fname + + # Find rank assignment log line for this file. Can't do much without it. + self._file_to_rank[fname] = rank + if rank == -1 and not self.find_rank(log_iter): + self._warn(f"cannot find rank assignment in log file - skipping", fname, line) + return + rank = self._file_to_rank[fname] + for pid in log_iter.get_pids(): - rank = -1 + print(f"INFO: scanning file {fname} rank {rank}, PID {pid}") for line in log_iter.new_iter(pid=pid): msg = line.get_msg() host = line.hostname datetime = line.time_stamp - # Find engine rank assignment (early in log) - match = self.re_rank_assign.match(msg) - if match: - rank = int(match.group(1)) - print(f"========== rank {rank} logfile {fname} ==========") - continue # Find pool term begin (PS leader step_up) - match = self.re_step_up.match(msg) - if match: - puuid = match.group(1) - term = int(match.group(2)) - if puuid not in self._pools: - self._pools[puuid] = {} - self._cur_ldr_rank[puuid]= rank - self._cur_term[puuid] = term - old_term = term - 1 - # if term already exists, error? - if term in self._pools[puuid]: - print(f"WARN: pool {puuid} term {term} already seen!") - # carry over the most recent map version change into the new term, to avoid possible KeyError in rebuild leader status match? - if old_term in self._pools: - #print(f"PROCESS FOUND pool {puuid} BEGIN term {term}: prior term {old_term} exists") - #if self._pools[old_term]["maps"] is not None: - #print(f"PROCESS FOUND pool {puuid} BEGIN term {term}: maps dictionary exists.") - #if (self._pools[old_term]["maps"]) != {}: - #print(f'PROCESS FOUND pool {puuid} BEGIN term {term}: maps dictionary is non-empty, with {len(list(self._pools[puuid][old_term]["maps"].keys()))} keys') - if self._pools and self._pools[puuid][old_term]["maps"] != {}: - last_mapver = list(self._pools[puuid][old_term]["maps"].keys())[-1] - pmap_versions = self._pools[puuid][old_term]["maps"][last_mapver] - pmap_versions["carryover"] = True - #pmap_versions["rebuild_gens"] = {} - else: - pmap_versions = {} - self._pools[puuid][term] = {"rank": rank, "begin_time": datetime, "end_time": "", "host": host, "pid": pid, "logfile": fname, "maps": pmap_versions} - #print(f"{datetime} FOUND pool {puuid} BEGIN\tterm {term} pmap_versions empty: {str(pmap_versions == {})} rank {rank}\t{host}\tPID {pid}\t{fname}") + if self.match_ps_step_up(fname, line, pid, rank): continue # Find pool term end (PS leader step_down) - match = self.re_step_down.match(msg) - if match: - puuid = match.group(1) - term = int(match.group(2)) - if (term != self._cur_term[puuid]): - print(f"WARN: step_down term={term} != cur_term={self._cur_term}") - self._cur_ldr_rank[puuid] = -1 - self._cur_term[puuid] = -1 - self._pools[puuid][term]["end_time"] = datetime - #print(f"{datetime} FOUND pool {puuid} END\tterm {term} rank {rank}\t{host}\tPID {pid}\t{fname}") + if self.match_ps_step_down(fname, line, pid, rank): continue # TODO: find target status updates (precursor to pool map updates) # Find pool map updates - # FIXME: but only on the current PS leader engine / term? And carry over latest pmap version from prior term to start with? - match = self.re_pmap_update.match(msg) - if match: - puuid = match.group(1) - from_ver = int(match.group(2)) - to_ver = int(match.group(3)) - # ignore if this engine is not the leader - if puuid not in self._pools or rank != self._cur_ldr_rank[puuid]: - continue - term = self._cur_term[puuid] - self._pools[puuid][term]["maps"][to_ver] = {"carryover": False, "from_ver": from_ver, "time": datetime, "rebuild_gens": {}} - #print(f"FOUND pool {puuid} map update {from_ver}->{to_ver} rank {rank}\t{host}\tPID {pid}\t{fname}") + if self.match_ps_pmap_update(fname, line, pid, rank): continue # Find rebuild start by the PS leader - match = self.re_rebuild_ldr_start.match(msg) - if match: - puuid = match.group(1) - mapver = int(match.group(2)) - rebuild_gen = int(match.group(3)) - rebuild_op = match.group(4) - if rank != self._cur_ldr_rank[puuid]: - continue - term = self._cur_term[puuid] - if term < 1: - print(f"WARN pool {puuid} I don't know what term it is ({term})!") - # TODO: for now assuming rebuild_gen isn't in the dictionary yet. should we test to be safe? - if rebuild_gen in self._pools[puuid][term]["maps"][mapver]["rebuild_gens"]: - print(f"WARN pool {puuid} term {term} mapver {mapver} already has rebuild_gen {rebuild_gen}!") - # TODO: keep timestamps for overall/scan start, pull start, completed, convert to float and store component durations too - self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen] = {"op": rebuild_op, "start_time": datetime, "time": "xx/xx-xx:xx:xx.xx", "started": True, "scanning": False, "pulling": False, "completed": False, "aborted": False, "duration": 0} - #print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{mapver}/{rebuild_gen}/{rebuild_op} rank {rank}\t{host}\tPID {pid}\t{fname}") + if self.match_ps_rb_start(fname, line, pid, rank): continue - # Find rebuild status update reported by the PS leader - match = self.re_rebuild_ldr_status.match(msg) - if match: - puuid = match.group(1) - mapver = int(match.group(2)) - rebuild_gen = int(match.group(3)) - rebuild_op = match.group(4) - status = match.group(5) - dur = int(match.group(6)) - if rank != self._cur_ldr_rank[puuid]: - continue - term = self._cur_term[puuid] - if term < 1: - print(f"WARN pool {puuid} I don't know what term it is ({term})!") - if mapver not in self._pools[puuid][term]["maps"]: - print(f"WARN pool {puuid} term {term} mapver {mapver} is not in maps dictionary - creating placeholder") - self._pools[puuid][term]["maps"][mapver] = {"carryover": False, "from_ver": mapver, "time": "xx/xx-xx:xx:xx.xx", "rebuild_gens": {}} - self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen] = {"op": rebuild_op, "start_time": "xx/xx-xx:xx:xx.xx", "time": "xx/xx-xx:xx:xx.xx", "started": True, "scanning": False, "pulling": False, "completed": False, "aborted": False, "duration": 0} - - # TODO: verify rebuild_op == self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["op"]? - if rebuild_gen in self._pools[puuid][term]["maps"][mapver]["rebuild_gens"]: - existing_op = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["op"] - if rebuild_op != existing_op: - print(f"WARN rb={puuid}/{mapver}/{rebuild_gen}/{existing_op} != this line's op {rebuild_op}") - if (status == "scanning"): - #print(f'ASSIGN _pools[{puuid}][{term}]["maps"][{mapver}]["rebuild_gens"][{rebuild_gen}]["scanning"] = True : rank {rank}\t{host}\tPID {pid}\t{fname}') - self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["scanning"] = True - elif (status == "pulling"): - #print(f'ASSIGN _pools[{puuid}][{term}]["maps"][{mapver}]["rebuild_gens"][{rebuild_gen}]["pulling"] = True : rank {rank}\t{host}\tPID {pid}\t{fname}') - self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["pulling"] = True - elif (status == "completed"): - #print(f'ASSIGN _pools[{puuid}][{term}]["maps"][{mapver}]["rebuild_gens"][{rebuild_gen}]["completed"] = True : rank {rank}\t{host}\tPID {pid}\t{fname}') - self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["completed"] = True - elif (status == "aborted"): - self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["aborted"] = True - - self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["time"] = datetime - self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["duration"] = dur - #print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{mapver}/{rebuild_gen}/{rebuild_op} STATUS={status}, DURATION={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}") + if self.match_ps_rb_status(fname, line, pid, rank): continue - # TODO: look for scan/migrate activity on all pool engines, count and correlate to PS leader activity? + # TODO: look for scan/migrate activity on all pool engines, count and correlate + # to PS leader activity? + # TODO: For a PID that is killed, clear any associated cur_ldr_rank / cur_ldr_pid. + # At logfile end, it could be due to engine killed, or could just be log rotation. def print_pools(self): for puuid in self._pools: - print(f"========== Pool {puuid}:") + print(f"===== Pool {puuid}:") for term in self._pools[puuid]: b = self._pools[puuid][term]["begin_time"] e = self._pools[puuid][term]["end_time"] @@ -210,41 +328,43 @@ def print_pools(self): print(f"{b} {puuid} BEGIN term {term}\trank {r}\t{h}\tPID {p}\t{f}") # Print pool map updates that happened within the term - for mapver in self._pools[puuid][term]["maps"]: - # Print rebuilds - + for v in self._pools[puuid][term]["maps"]: # TODO: print tgt state changes - #for tgt_change in self._pools[puuid][term]["maps"][mapver]["tgt_state_changes"]: - # print(f"TGT state {tgt_change["state"]}, rank: {tgt_change["rank"]}, idx: {tgt_change["tgt_idx"]}") - t = self._pools[puuid][term]["maps"][mapver]["time"] - from_ver = self._pools[puuid][term]["maps"][mapver]["from_ver"] - print(f"{t} {puuid} MAPVER {from_ver}->{mapver}") + + # Print map updates + t = self._pools[puuid][term]["maps"][v]["time"] + from_ver = self._pools[puuid][term]["maps"][v]["from_ver"] + print(f"{t} {puuid} MAPVER {from_ver}->{v}") - # Rebuilds - for rebuild_gen in self._pools[puuid][term]["maps"][mapver]["rebuild_gens"]: - rebuild_op = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["op"] - dur = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["duration"] - started = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["started"] + # Print rebuilds + for g in self._pools[puuid][term]["maps"][v]["rb_gens"]: + op = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["op"] + dur = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["duration"] + # line len + started = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["started"] # TODO: scan_done, pull_done booleans - scanning = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["scanning"] - pulling = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["pulling"] - completed =self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["completed"] - aborted = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["aborted"] - st = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["start_time"] - ut = self._pools[puuid][term]["maps"][mapver]["rebuild_gens"][rebuild_gen]["time"] + # line len + scan = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["scanning"] + pull = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["pulling"] + # line len + comp = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["completed"] + abrt = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["aborted"] + # line len + st = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["start_time"] + ut = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["time"] status = "started" - if aborted: + if abrt: status = "aborted" - if completed: + if comp: status = "completed" - elif pulling: + elif pull: status = "pulling" - elif scanning: + elif scan: status = "scanning" - print(f"{st} {puuid} RBSTRT {mapver}/{rebuild_gen}/{rebuild_op}") - updated = scanning or pulling or completed or aborted + print(f"{st} {puuid} RBSTRT {v}/{g}/{op}") + updated = scan or pull or comp or abrt if updated: - print(f"{ut} {puuid} RBUPDT {mapver}/{rebuild_gen}/{rebuild_op} {status} {dur} seconds") + print(f"{ut} {puuid} RBUPDT {v}/{g}/{op} {status} {dur} seconds") # Print term end (if there is a PS leader step_down) if e != "": @@ -270,12 +390,19 @@ def run(): sys.stdout = out_fd print(f'Logging to {out_fname}', file=real_stdout) + rank_in_fname_re = re.compile(r"\.rank=(\d+)\.") + sp = SysPools() for fname in args.filelist: if fname.endswith("cart_logtest"): continue - #print(f"\n========== Engine log: {fname} ==========") + + rank = -1 + match = rank_in_fname_re.search(fname) + if match: + rank = int(match.group(1)) + try: log_iter = cart_logparse.LogIter(fname) except UnicodeDecodeError: @@ -283,9 +410,9 @@ def run(): if log_iter.file_corrupt: sys.exit(1) - #print(f"{len(log_iter.get_pids())} PIDs") - sp.check_file(log_iter) - #pprint.pprint(sp._pools) + sp.scan_file(log_iter, rank=rank) + + print(f"\n========== Pools Report ({len(sp._warnings)} warnings during scanning) ==========\n") sp.sort() sp.print_pools() From 5c0afdf2e09b4137a4862bb653dff102b477459a Mon Sep 17 00:00:00 2001 From: Kenneth Cain Date: Fri, 20 Dec 2024 11:19:06 -0500 Subject: [PATCH 3/8] - legacy rebuild start/status_check regexs to match on daos 2.6 logging - fix up some flake8 python linting issues Signed-off-by: Kenneth Cain --- src/tests/ftest/cart/util/daos_sys_logscan.py | 255 +++++++++++++----- 1 file changed, 183 insertions(+), 72 deletions(-) diff --git a/src/tests/ftest/cart/util/daos_sys_logscan.py b/src/tests/ftest/cart/util/daos_sys_logscan.py index 36c64b259ec..189f37520d2 100644 --- a/src/tests/ftest/cart/util/daos_sys_logscan.py +++ b/src/tests/ftest/cart/util/daos_sys_logscan.py @@ -10,9 +10,6 @@ import cart_logparse import re import sys -#import time -#import pprint -#from collections import Counter, OrderedDict, defaultdict class SysPools(): @@ -49,8 +46,8 @@ class SysPools(): # Rebuild preliminary steps # TODO/FIXME rebuild_task_ult() waiting for sched time, and map dist needs uniform rb= string. - #re.compile(r"rebuild_task_ult\(\).*rebuild task sleep (\d+) second") - #re.compile(r"rebuild_task_ult\(\).*map_dist_ver (\d+) map ver (\d+)") + # re.compile(r"rebuild_task_ult\(\).*rebuild task sleep (\d+) second") + # re.compile(r"rebuild_task_ult\(\).*map_dist_ver (\d+) map ver (\d+)") # Rebuild: PS leader engine starting and status checking a given operation # statuses: "scanning", "pulling", "completed", "aborted" @@ -59,6 +56,13 @@ class SysPools(): re_rebuild_ldr_start = re.compile(ldr_start_re) re_rebuild_ldr_status = re.compile(ldr_status_re) + # Legacy rebuild PS leader logging (before uniform rebuild string) + old_ldr_start_re = r"rebuild_leader_start.*([0-9a-fA-F]{8}).*version=(\d+)/(\d+).*op=(\w+)" + old_ldr_status_re = (r"rebuild_leader_status_check\(\) (\w+) \[(\w+)\] \(pool ([0-9a-fA-F]{8}) " + r"leader (\d+) term (\d+).*ver=(\d+),gen (\d+).*duration=(\d+) secs") + re_old_ldr_start = re.compile(old_ldr_start_re) + re_old_ldr_status = re.compile(old_ldr_status_re) + def __init__(self): # dictionaries indexed by pool UUID self._pools = {} @@ -73,7 +77,11 @@ def __init__(self): self._warnings = [] def _warn(self, wmsg, fname, line): - full_msg = f"WARN file={fname}, line={line.lineno}: " + wmsg + full_msg = f"WARN file={fname}" + if line: + full_msg += f", line={line.lineno}" + full_msg += f": {wmsg}" + self._warnings.append(full_msg) print(full_msg) @@ -97,7 +105,7 @@ def get_line_components(self, line): # is this rank, pid the leader of the pool with uuid puuid? def is_leader(self, puuid, rank, pid): - if not puuid in self._pools: + if puuid not in self._pools: return False if self._cur_ldr_rank[puuid] == rank and self._cur_ldr_pid[puuid] == pid: return True @@ -111,7 +119,7 @@ def match_ps_step_up(self, fname, line, pid, rank): term = int(match.group(2)) if puuid not in self._pools: self._pools[puuid] = {} - self._cur_ldr_rank[puuid]= rank + self._cur_ldr_rank[puuid] = rank self._cur_ldr_pid[puuid] = pid self._cur_term[puuid] = term old_term = term - 1 @@ -124,7 +132,7 @@ def match_ps_step_up(self, fname, line, pid, rank): last_mapver = list(self._pools[puuid][old_term]["maps"].keys())[-1] pmap_versions = self._pools[puuid][old_term]["maps"][last_mapver] pmap_versions["carryover"] = True - #pmap_versions["rb_gens"] = {} + # pmap_versions["rb_gens"] = {} else: pmap_versions = {} self._pools[puuid][term] = { @@ -135,9 +143,9 @@ def match_ps_step_up(self, fname, line, pid, rank): "pid": pid, "logfile": fname, "maps": pmap_versions - } + } # DEBUG - #print(f"{datetime} FOUND pool {puuid} BEGIN\tterm {term} pmap_versions empty: " + # print(f"{datetime} FOUND pool {puuid} BEGIN\tterm {term} pmap_versions empty: " # f"{str(pmap_versions == {})} rank {rank}\t{host}\tPID {pid}\t{fname}") return True return False @@ -148,14 +156,14 @@ def match_ps_step_down(self, fname, line, pid, rank): if match: puuid = match.group(1) term = int(match.group(2)) - if (term != self._cur_term[puuid]): + if term != self._cur_term[puuid]: self._warn(f"step_down term={term} != cur_term={self._cur_term}", fname, line) self._cur_ldr_rank[puuid] = -1 self._cur_ldr_pid[puuid] = -1 self._cur_term[puuid] = -1 self._pools[puuid][term]["end_time"] = datetime # DEBUG - #print(f"{datetime} FOUND pool {puuid} END\tterm {term} rank {rank}\t{host}\t" + # print(f"{datetime} FOUND pool {puuid} END\tterm {term} rank {rank}\t{host}\t" # f"PID {pid}\t{fname}") return True return False @@ -218,6 +226,42 @@ def match_ps_rb_start(self, fname, line, pid, rank): return True return False + def match_legacy_ps_rb_start(self, fname, line, pid, rank): + #old_ldr_start_re = r"rebuild_leader_start.*([0-9a-fA-F]{8}).*version=(\d+)/(\d+).*op=(\w+)" + msg, host, datetime = self.get_line_components(line) + match = self.re_old_ldr_start.match(msg) + if match: + puuid = match.group(1) + ver = int(match.group(2)) + gen = int(match.group(3)) + op = match.group(4) + if not self.is_leader(puuid, rank, pid): + return True + term = self._cur_term[puuid] + if term < 1: + self._warn(f"pool {puuid} I don't know what term it is ({term})!", fname, line) + return True + if gen in self._pools[puuid][term]["maps"][ver]["rb_gens"]: + self._warn(f"pool {puuid} term {term} ver {ver} already has gen {gen}", fname, line) + # TODO: keep timestamps for overall/scan start, pull start, completed + # convert to float and store component durations too + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen] = { + "op": op, + "start_time": datetime, + "time": "xx/xx-xx:xx:xx.xx", + "started": True, + "scanning": False, + "pulling": False, + "completed": False, + "aborted": False, + "duration": 0 + } + # DEBUG + #print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{ver}/{gen}/{op} " + # f"rank {rank}\t{host}\tPID {pid}\t{fname}") + return True + return False + def match_ps_rb_status(self, fname, line, pid, rank): msg, host, datetime = self.get_line_components(line) match = self.re_rebuild_ldr_status.match(msg) @@ -255,6 +299,67 @@ def match_ps_rb_status(self, fname, line, pid, rank): existing_op = self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["op"] if op != existing_op: self._warn(f"rb={puuid}/{ver}/{gen}/{existing_op} != line op {op}", fname, line) + if status == "scanning": + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["scanning"] = True + elif status == "pulling": + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["pulling"] = True + elif status == "completed": + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["completed"] = True + elif status == "aborted": + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["aborted"] = True + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["time"] = datetime + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["duration"] = dur + # DEBUG + # print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{ver}/{gen}/{op} " + # f"STATUS={status}, DUR={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}") + return True + return False + + def match_legacy_ps_rb_status(self, fname, line, pid, rank): + msg, host, datetime = self.get_line_components(line) + match = self.re_old_ldr_status.match(msg) + if match: + op = match.group(1) + status = match.group(2) + puuid = match.group(3) + log_ldr = int(match.group(4)) + log_term = int(match.group(5)) + ver = int(match.group(6)) + gen = int(match.group(7)) + dur = int(match.group(8)) + if not self.is_leader(puuid, rank, pid): + return True + if rank != log_ldr: + self._warn(f"pool {puuid} my rank {rank} != leader {log_ldr}", fname, line) + term = self._cur_term[puuid] + if term < 1 or term != log_term: + self._warn(f"pool {puuid} I don't know what term it is ({term}), {log_term}!", + fname, line) + return True + if ver not in self._pools[puuid][term]["maps"]: + self._warn(f"pool {puuid} term {term} ver {ver} not in maps - add placeholder", + fname, line) + self._pools[puuid][term]["maps"][ver] = { + "carryover": False, + "from_ver": ver, + "time": "xx/xx-xx:xx:xx.xx", + "rb_gens": {} + } + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen] = { + "op": op, + "start_time": "xx/xx-xx:xx:xx.xx", + "time": "xx/xx-xx:xx:xx.xx", + "started": True, + "scanning": False, + "pulling": False, + "completed": False, + "aborted": False, + "duration": 0 + } + if gen in self._pools[puuid][term]["maps"][ver]["rb_gens"]: + existing_op = self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["op"] + if op != existing_op: + self._warn(f"rb={puuid}/{ver}/{gen}/{existing_op} != line op {op}", fname, line) if (status == "scanning"): self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["scanning"] = True elif (status == "pulling"): @@ -277,7 +382,7 @@ def scan_file(self, log_iter, rank=-1): # Find rank assignment log line for this file. Can't do much without it. self._file_to_rank[fname] = rank if rank == -1 and not self.find_rank(log_iter): - self._warn(f"cannot find rank assignment in log file - skipping", fname, line) + self._warn(f"cannot find rank assignment in log file - skipping", fname) return rank = self._file_to_rank[fname] @@ -306,9 +411,15 @@ def scan_file(self, log_iter, rank=-1): if self.match_ps_rb_start(fname, line, pid, rank): continue + if self.match_legacy_ps_rb_start(fname, line, pid, rank): + continue + if self.match_ps_rb_status(fname, line, pid, rank): continue + if self.match_legacy_ps_rb_status(fname, line, pid, rank): + continue + # TODO: look for scan/migrate activity on all pool engines, count and correlate # to PS leader activity? # TODO: For a PID that is killed, clear any associated cur_ldr_rank / cur_ldr_pid. @@ -316,61 +427,61 @@ def scan_file(self, log_iter, rank=-1): def print_pools(self): for puuid in self._pools: - print(f"===== Pool {puuid}:") - for term in self._pools[puuid]: - b = self._pools[puuid][term]["begin_time"] - e = self._pools[puuid][term]["end_time"] - r = self._pools[puuid][term]["rank"] - h = self._pools[puuid][term]["host"] - p = self._pools[puuid][term]["pid"] - f = self._pools[puuid][term]["logfile"] - # Print term begin - print(f"{b} {puuid} BEGIN term {term}\trank {r}\t{h}\tPID {p}\t{f}") - - # Print pool map updates that happened within the term - for v in self._pools[puuid][term]["maps"]: - # TODO: print tgt state changes - - # Print map updates - t = self._pools[puuid][term]["maps"][v]["time"] - from_ver = self._pools[puuid][term]["maps"][v]["from_ver"] - print(f"{t} {puuid} MAPVER {from_ver}->{v}") - - # Print rebuilds - for g in self._pools[puuid][term]["maps"][v]["rb_gens"]: - op = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["op"] - dur = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["duration"] - # line len - started = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["started"] - # TODO: scan_done, pull_done booleans - # line len - scan = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["scanning"] - pull = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["pulling"] - # line len - comp = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["completed"] - abrt = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["aborted"] - # line len - st = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["start_time"] - ut = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["time"] - status = "started" - if abrt: - status = "aborted" - if comp: - status = "completed" - elif pull: - status = "pulling" - elif scan: - status = "scanning" - print(f"{st} {puuid} RBSTRT {v}/{g}/{op}") - updated = scan or pull or comp or abrt - if updated: - print(f"{ut} {puuid} RBUPDT {v}/{g}/{op} {status} {dur} seconds") - - # Print term end (if there is a PS leader step_down) - if e != "": - print(f"{e} {puuid} END term {term}\trank {r}\t{h}\tPID {p}\t{f}") - else: - print(" " * 18 + f"{puuid} END term {term}\trank {r}\t{h}\tPID {p}\t{f}") + print(f"===== Pool {puuid}:") + for term in self._pools[puuid]: + b = self._pools[puuid][term]["begin_time"] + e = self._pools[puuid][term]["end_time"] + r = self._pools[puuid][term]["rank"] + h = self._pools[puuid][term]["host"] + p = self._pools[puuid][term]["pid"] + f = self._pools[puuid][term]["logfile"] + # Print term begin + print(f"{b} {puuid} BEGIN term {term}\trank {r}\t{h}\tPID {p}\t{f}") + + # Print pool map updates that happened within the term + for v in self._pools[puuid][term]["maps"]: + # TODO: print tgt state changes + + # Print map updates + t = self._pools[puuid][term]["maps"][v]["time"] + from_ver = self._pools[puuid][term]["maps"][v]["from_ver"] + print(f"{t} {puuid} MAPVER {from_ver}->{v}") + + # Print rebuilds + for g in self._pools[puuid][term]["maps"][v]["rb_gens"]: + op = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["op"] + dur = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["duration"] + # line len + started = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["started"] + # TODO: scan_done, pull_done booleans + # line len + scan = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["scanning"] + pull = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["pulling"] + # line len + comp = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["completed"] + abrt = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["aborted"] + # line len + st = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["start_time"] + ut = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["time"] + status = "started" + if abrt: + status = "aborted" + if comp: + status = "completed" + elif pull: + status = "pulling" + elif scan: + status = "scanning" + print(f"{st} {puuid} RBSTRT {v}/{g}/{op}") + updated = scan or pull or comp or abrt + if updated: + print(f"{ut} {puuid} RBUPDT {v}/{g}/{op} {status} {dur} seconds") + + # Print term end (if there is a PS leader step_down) + if e != "": + print(f"{e} {puuid} END term {term}\trank {r}\t{h}\tPID {p}\t{f}") + else: + print(" " * 18 + f"{puuid} END term {term}\trank {r}\t{h}\tPID {p}\t{f}") def sort(self): for puuid in self._pools: @@ -381,7 +492,7 @@ def sort(self): def run(): """Scan a list of daos_engine logfiles""" ap = argparse.ArgumentParser() - ap.add_argument('filelist', nargs='+') + ap.add_argument('filelist', nargs='+') args = ap.parse_args() out_fname = 'sys_logscan.txt' @@ -397,7 +508,7 @@ def run(): for fname in args.filelist: if fname.endswith("cart_logtest"): continue - + rank = -1 match = rank_in_fname_re.search(fname) if match: @@ -407,7 +518,7 @@ def run(): log_iter = cart_logparse.LogIter(fname) except UnicodeDecodeError: log_iter = cart_logparse.LogIter(args.file, check_encoding=True) - + if log_iter.file_corrupt: sys.exit(1) sp.scan_file(log_iter, rank=rank) From f00b62c3d97596aa54c617a9611b134ba97df0ec Mon Sep 17 00:00:00 2001 From: Kenneth Cain Date: Fri, 20 Dec 2024 12:00:07 -0500 Subject: [PATCH 4/8] more cleanup Signed-off-by: Kenneth Cain --- src/tests/ftest/cart/util/daos_sys_logscan.py | 103 +++++++++++------- 1 file changed, 62 insertions(+), 41 deletions(-) diff --git a/src/tests/ftest/cart/util/daos_sys_logscan.py b/src/tests/ftest/cart/util/daos_sys_logscan.py index 189f37520d2..25dc445bd53 100644 --- a/src/tests/ftest/cart/util/daos_sys_logscan.py +++ b/src/tests/ftest/cart/util/daos_sys_logscan.py @@ -45,7 +45,7 @@ class SysPools(): rbid_re = r"rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+)" # Rebuild preliminary steps - # TODO/FIXME rebuild_task_ult() waiting for sched time, and map dist needs uniform rb= string. + # TODO/FIXME rebuild_task_ult() wait for scheduling, and map dist needs uniform rb= string. # re.compile(r"rebuild_task_ult\(\).*rebuild task sleep (\d+) second") # re.compile(r"rebuild_task_ult\(\).*map_dist_ver (\d+) map ver (\d+)") @@ -75,6 +75,9 @@ def __init__(self): self._file_to_rank = {} self._warnings = [] + self._check_rb_new_fmt = True + self._check_rb_legacy_fmt = True + self._debug = False def _warn(self, wmsg, fname, line): full_msg = f"WARN file={fname}" @@ -144,9 +147,9 @@ def match_ps_step_up(self, fname, line, pid, rank): "logfile": fname, "maps": pmap_versions } - # DEBUG - # print(f"{datetime} FOUND pool {puuid} BEGIN\tterm {term} pmap_versions empty: " - # f"{str(pmap_versions == {})} rank {rank}\t{host}\tPID {pid}\t{fname}") + if self._debug: + print(f"{datetime} FOUND pool {puuid} BEGIN\tterm {term} pmap_versions empty: " + f"{str(pmap_versions == {})} rank {rank}\t{host}\tPID {pid}\t{fname}") return True return False @@ -162,9 +165,9 @@ def match_ps_step_down(self, fname, line, pid, rank): self._cur_ldr_pid[puuid] = -1 self._cur_term[puuid] = -1 self._pools[puuid][term]["end_time"] = datetime - # DEBUG - # print(f"{datetime} FOUND pool {puuid} END\tterm {term} rank {rank}\t{host}\t" - # f"PID {pid}\t{fname}") + if self._debug: + print(f"{datetime} FOUND pool {puuid} END\tterm {term} rank {rank}\t{host}\t" + f"PID {pid}\t{fname}") return True return False @@ -184,10 +187,10 @@ def match_ps_pmap_update(self, fname, line, pid, rank): "from_ver": from_ver, "time": datetime, "rb_gens": {} - } - # DEBUG - #print(f"FOUND pool {puuid} map update {from_ver}->{to_ver} rank {rank}\t{host}\t" - # f"PID {pid}\t{fname}") + } + if self._debug: + print(f"FOUND pool {puuid} map update {from_ver}->{to_ver} rank {rank}\t{host}\t" + f"PID {pid}\t{fname}") return True return False @@ -195,9 +198,14 @@ def get_rb_components(self, match): return match.group(1), int(match.group(2)), int(match.group(3)), match.group(4) def match_ps_rb_start(self, fname, line, pid, rank): + # Do not match on new rebuild log format if we found legacy format + if not self._check_rb_new_fmt: + return False msg, host, datetime = self.get_line_components(line) match = self.re_rebuild_ldr_start.match(msg) if match: + # Disable checking for legacy rebuild log format, to save execution time + self._check_rb_legacy_fmt = False puuid, ver, gen, op = self.get_rb_components(match) if not self.is_leader(puuid, rank, pid): return True @@ -219,18 +227,22 @@ def match_ps_rb_start(self, fname, line, pid, rank): "completed": False, "aborted": False, "duration": 0 - } - # DEBUG - #print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{ver}/{gen}/{op} " - # f"rank {rank}\t{host}\tPID {pid}\t{fname}") + } + if self._debug: + print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{ver}/{gen}/{op} " + f"rank {rank}\t{host}\tPID {pid}\t{fname}") return True return False def match_legacy_ps_rb_start(self, fname, line, pid, rank): - #old_ldr_start_re = r"rebuild_leader_start.*([0-9a-fA-F]{8}).*version=(\d+)/(\d+).*op=(\w+)" + # Do not match on legacy rebuild log format if we found new format + if not self._check_rb_legacy_fmt: + return False msg, host, datetime = self.get_line_components(line) match = self.re_old_ldr_start.match(msg) if match: + # Disable checking for new rebuild log format, to save execution time + self._check_rb_new_fmt = False puuid = match.group(1) ver = int(match.group(2)) gen = int(match.group(3)) @@ -255,17 +267,22 @@ def match_legacy_ps_rb_start(self, fname, line, pid, rank): "completed": False, "aborted": False, "duration": 0 - } - # DEBUG - #print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{ver}/{gen}/{op} " - # f"rank {rank}\t{host}\tPID {pid}\t{fname}") + } + if self._debug: + print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{ver}/{gen}/{op} " + f"rank {rank}\t{host}\tPID {pid}\t{fname}") return True return False def match_ps_rb_status(self, fname, line, pid, rank): + # Do not match on new rebuild log format if we found legacy format + if not self._check_rb_new_fmt: + return False msg, host, datetime = self.get_line_components(line) match = self.re_rebuild_ldr_status.match(msg) if match: + # Disable checking for legacy rebuild log format, to save execution time + self._check_rb_legacy_fmt = False puuid, ver, gen, op = self.get_rb_components(match) status = match.group(5) dur = int(match.group(6)) @@ -283,7 +300,7 @@ def match_ps_rb_status(self, fname, line, pid, rank): "from_ver": ver, "time": "xx/xx-xx:xx:xx.xx", "rb_gens": {} - } + } self._pools[puuid][term]["maps"][ver]["rb_gens"][gen] = { "op": op, "start_time": "xx/xx-xx:xx:xx.xx", @@ -294,7 +311,7 @@ def match_ps_rb_status(self, fname, line, pid, rank): "completed": False, "aborted": False, "duration": 0 - } + } if gen in self._pools[puuid][term]["maps"][ver]["rb_gens"]: existing_op = self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["op"] if op != existing_op: @@ -309,16 +326,21 @@ def match_ps_rb_status(self, fname, line, pid, rank): self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["aborted"] = True self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["time"] = datetime self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["duration"] = dur - # DEBUG - # print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{ver}/{gen}/{op} " - # f"STATUS={status}, DUR={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}") + if self._debug: + print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{ver}/{gen}/{op} " + f"STATUS={status}, DUR={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}") return True return False def match_legacy_ps_rb_status(self, fname, line, pid, rank): + # Do not match on legacy rebuild log format if we found new format + if not self._check_rb_legacy_fmt: + return False msg, host, datetime = self.get_line_components(line) match = self.re_old_ldr_status.match(msg) if match: + # Disable checking for new rebuild log format, to save execution time + self._check_rb_new_fmt = False op = match.group(1) status = match.group(2) puuid = match.group(3) @@ -344,7 +366,7 @@ def match_legacy_ps_rb_status(self, fname, line, pid, rank): "from_ver": ver, "time": "xx/xx-xx:xx:xx.xx", "rb_gens": {} - } + } self._pools[puuid][term]["maps"][ver]["rb_gens"][gen] = { "op": op, "start_time": "xx/xx-xx:xx:xx.xx", @@ -355,7 +377,7 @@ def match_legacy_ps_rb_status(self, fname, line, pid, rank): "completed": False, "aborted": False, "duration": 0 - } + } if gen in self._pools[puuid][term]["maps"][ver]["rb_gens"]: existing_op = self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["op"] if op != existing_op: @@ -367,12 +389,12 @@ def match_legacy_ps_rb_status(self, fname, line, pid, rank): elif (status == "completed"): self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["completed"] = True elif (status == "aborted"): - self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["aborted"] = True + self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["aborted"] = True self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["time"] = datetime self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["duration"] = dur - # DEBUG - #print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{ver}/{gen}/{op} " - # f"STATUS={status}, DUR={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}") + if self._debug: + print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{ver}/{gen}/{op} " + f"STATUS={status}, DUR={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}") return True return False @@ -382,17 +404,13 @@ def scan_file(self, log_iter, rank=-1): # Find rank assignment log line for this file. Can't do much without it. self._file_to_rank[fname] = rank if rank == -1 and not self.find_rank(log_iter): - self._warn(f"cannot find rank assignment in log file - skipping", fname) + self._warn("cannot find rank assignment in log file - skipping", fname) return rank = self._file_to_rank[fname] for pid in log_iter.get_pids(): print(f"INFO: scanning file {fname} rank {rank}, PID {pid}") for line in log_iter.new_iter(pid=pid): - msg = line.get_msg() - host = line.hostname - datetime = line.time_stamp - # Find pool term begin (PS leader step_up) if self.match_ps_step_up(fname, line, pid, rank): continue @@ -451,10 +469,8 @@ def print_pools(self): for g in self._pools[puuid][term]["maps"][v]["rb_gens"]: op = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["op"] dur = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["duration"] - # line len - started = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["started"] + strt = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["started"] # TODO: scan_done, pull_done booleans - # line len scan = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["scanning"] pull = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["pulling"] # line len @@ -463,10 +479,13 @@ def print_pools(self): # line len st = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["start_time"] ut = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["time"] - status = "started" + if strt: + status = "started" + + # status is latest status reached for the given rebuild if abrt: status = "aborted" - if comp: + elif comp: status = "completed" elif pull: status = "pulling" @@ -489,6 +508,7 @@ def sort(self): self._pools[puuid] = tmp # _pools[puuid][term]["maps"] should have been inserted in ascending order already? + def run(): """Scan a list of daos_engine logfiles""" ap = argparse.ArgumentParser() @@ -527,5 +547,6 @@ def run(): sp.sort() sp.print_pools() + if __name__ == '__main__': run() From 8b08d98876a123bcaf3398c0389a8ca1e9cde5ff Mon Sep 17 00:00:00 2001 From: Kenneth Cain Date: Fri, 20 Dec 2024 15:25:26 -0500 Subject: [PATCH 5/8] More cleaning up of pylint/flake/rpmbuild warnings and errors Signed-off-by: Kenneth Cain --- src/tests/ftest/cart/util/daos_sys_logscan.py | 103 +++++++++--------- 1 file changed, 49 insertions(+), 54 deletions(-) mode change 100644 => 100755 src/tests/ftest/cart/util/daos_sys_logscan.py diff --git a/src/tests/ftest/cart/util/daos_sys_logscan.py b/src/tests/ftest/cart/util/daos_sys_logscan.py old mode 100644 new mode 100755 index 25dc445bd53..1734d5578d6 --- a/src/tests/ftest/cart/util/daos_sys_logscan.py +++ b/src/tests/ftest/cart/util/daos_sys_logscan.py @@ -7,16 +7,16 @@ """Scan daos_engine log files to get a summary of pools activity.""" import argparse -import cart_logparse import re import sys +import cart_logparse class SysPools(): """Directory of Pools and Summary Activities Found in Engine Log Files""" - # TODO - # diagram of nested dictionaries constructed + # Future possibilities include: + # diagram of nested dictionaries constructed in the comments # system map update events (output outside if pool-specific context) # SWIM events seen by PS leader? # add/remove target events on PS leader? @@ -30,7 +30,7 @@ class SysPools(): re_step_up = re.compile(r"rdb_raft_step_up.*([0-9a-fA-F]{8}).*leader of term (\d+)") re_step_down = re.compile(r"rdb_raft_step_down.*([0-9a-fA-F]{8}).*leader of term (\d+)") - # TODO: target state change events + # Future possibility: target state change events # update_one_tgt(), update_tgt_down_drain_to_downout() # "change Target.*rank (\d+) idx (\d+).*to (\w+)" # need those functions to print pool UUID. Then here, store a list of target change events in @@ -44,8 +44,8 @@ class SysPools(): # uniform rebuild string identifier rb=/// rbid_re = r"rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+)" - # Rebuild preliminary steps - # TODO/FIXME rebuild_task_ult() wait for scheduling, and map dist needs uniform rb= string. + # Future possibility: match the rebuild preliminary steps + # rebuild_task_ult() wait for scheduling, and map dist - both would info to match on. # re.compile(r"rebuild_task_ult\(\).*rebuild task sleep (\d+) second") # re.compile(r"rebuild_task_ult\(\).*map_dist_ver (\d+) map ver (\d+)") @@ -79,7 +79,7 @@ def __init__(self): self._check_rb_legacy_fmt = True self._debug = False - def _warn(self, wmsg, fname, line): + def _warn(self, wmsg, fname, line=None): full_msg = f"WARN file={fname}" if line: full_msg += f", line={line.lineno}" @@ -88,7 +88,11 @@ def _warn(self, wmsg, fname, line): self._warnings.append(full_msg) print(full_msg) - def find_rank(self, log_iter): + def get_warnings(self): + """Return all warnings stored when scanning engine log files""" + return self._warnings + + def _find_rank(self, log_iter): print(f"INFO: searching for rank in file {log_iter.fname}") found = False for line in log_iter.new_iter(): @@ -99,7 +103,7 @@ def find_rank(self, log_iter): found = True break - # TODO: what about log rotation (not an engine start scenario)? + # Future enhancement: what about log rotation (not an engine start scenario)? return found # return log-message, hostname, and date/timestamp components of the line @@ -107,14 +111,14 @@ def get_line_components(self, line): return line.get_msg(), line.hostname, line.time_stamp # is this rank, pid the leader of the pool with uuid puuid? - def is_leader(self, puuid, rank, pid): + def _is_leader(self, puuid, rank, pid): if puuid not in self._pools: return False if self._cur_ldr_rank[puuid] == rank and self._cur_ldr_pid[puuid] == pid: return True return False - def match_ps_step_up(self, fname, line, pid, rank): + def _match_ps_step_up(self, fname, line, pid, rank): msg, host, datetime = self.get_line_components(line) match = self.re_step_up.match(msg) if match: @@ -153,7 +157,7 @@ def match_ps_step_up(self, fname, line, pid, rank): return True return False - def match_ps_step_down(self, fname, line, pid, rank): + def _match_ps_step_down(self, fname, line, pid, rank): msg, host, datetime = self.get_line_components(line) match = self.re_step_down.match(msg) if match: @@ -171,7 +175,7 @@ def match_ps_step_down(self, fname, line, pid, rank): return True return False - def match_ps_pmap_update(self, fname, line, pid, rank): + def _match_ps_pmap_update(self, fname, line, pid, rank): msg, host, datetime = self.get_line_components(line) match = self.re_pmap_update.match(msg) if match: @@ -179,7 +183,7 @@ def match_ps_pmap_update(self, fname, line, pid, rank): from_ver = int(match.group(2)) to_ver = int(match.group(3)) # ignore if this engine is not the leader - if not self.is_leader(puuid, rank, pid): + if not self._is_leader(puuid, rank, pid): return True term = self._cur_term[puuid] self._pools[puuid][term]["maps"][to_ver] = { @@ -194,10 +198,10 @@ def match_ps_pmap_update(self, fname, line, pid, rank): return True return False - def get_rb_components(self, match): + def _get_rb_components(self, match): return match.group(1), int(match.group(2)), int(match.group(3)), match.group(4) - def match_ps_rb_start(self, fname, line, pid, rank): + def _match_ps_rb_start(self, fname, line, pid, rank): # Do not match on new rebuild log format if we found legacy format if not self._check_rb_new_fmt: return False @@ -206,8 +210,8 @@ def match_ps_rb_start(self, fname, line, pid, rank): if match: # Disable checking for legacy rebuild log format, to save execution time self._check_rb_legacy_fmt = False - puuid, ver, gen, op = self.get_rb_components(match) - if not self.is_leader(puuid, rank, pid): + puuid, ver, gen, op = self._get_rb_components(match) + if not self._is_leader(puuid, rank, pid): return True term = self._cur_term[puuid] if term < 1: @@ -215,8 +219,7 @@ def match_ps_rb_start(self, fname, line, pid, rank): return True if gen in self._pools[puuid][term]["maps"][ver]["rb_gens"]: self._warn(f"pool {puuid} term {term} ver {ver} already has gen {gen}", fname, line) - # TODO: keep timestamps for overall/scan start, pull start, completed - # convert to float and store component durations too + # Future possibility: keep timestamps, durations for scan start, pull start, completed self._pools[puuid][term]["maps"][ver]["rb_gens"][gen] = { "op": op, "start_time": datetime, @@ -234,7 +237,7 @@ def match_ps_rb_start(self, fname, line, pid, rank): return True return False - def match_legacy_ps_rb_start(self, fname, line, pid, rank): + def _match_legacy_ps_rb_start(self, fname, line, pid, rank): # Do not match on legacy rebuild log format if we found new format if not self._check_rb_legacy_fmt: return False @@ -247,7 +250,7 @@ def match_legacy_ps_rb_start(self, fname, line, pid, rank): ver = int(match.group(2)) gen = int(match.group(3)) op = match.group(4) - if not self.is_leader(puuid, rank, pid): + if not self._is_leader(puuid, rank, pid): return True term = self._cur_term[puuid] if term < 1: @@ -255,8 +258,6 @@ def match_legacy_ps_rb_start(self, fname, line, pid, rank): return True if gen in self._pools[puuid][term]["maps"][ver]["rb_gens"]: self._warn(f"pool {puuid} term {term} ver {ver} already has gen {gen}", fname, line) - # TODO: keep timestamps for overall/scan start, pull start, completed - # convert to float and store component durations too self._pools[puuid][term]["maps"][ver]["rb_gens"][gen] = { "op": op, "start_time": datetime, @@ -274,7 +275,7 @@ def match_legacy_ps_rb_start(self, fname, line, pid, rank): return True return False - def match_ps_rb_status(self, fname, line, pid, rank): + def _match_ps_rb_status(self, fname, line, pid, rank): # Do not match on new rebuild log format if we found legacy format if not self._check_rb_new_fmt: return False @@ -283,10 +284,10 @@ def match_ps_rb_status(self, fname, line, pid, rank): if match: # Disable checking for legacy rebuild log format, to save execution time self._check_rb_legacy_fmt = False - puuid, ver, gen, op = self.get_rb_components(match) + puuid, ver, gen, op = self._get_rb_components(match) status = match.group(5) dur = int(match.group(6)) - if not self.is_leader(puuid, rank, pid): + if not self._is_leader(puuid, rank, pid): return True term = self._cur_term[puuid] if term < 1: @@ -332,7 +333,7 @@ def match_ps_rb_status(self, fname, line, pid, rank): return True return False - def match_legacy_ps_rb_status(self, fname, line, pid, rank): + def _match_legacy_ps_rb_status(self, fname, line, pid, rank): # Do not match on legacy rebuild log format if we found new format if not self._check_rb_legacy_fmt: return False @@ -349,7 +350,7 @@ def match_legacy_ps_rb_status(self, fname, line, pid, rank): ver = int(match.group(6)) gen = int(match.group(7)) dur = int(match.group(8)) - if not self.is_leader(puuid, rank, pid): + if not self._is_leader(puuid, rank, pid): return True if rank != log_ldr: self._warn(f"pool {puuid} my rank {rank} != leader {log_ldr}", fname, line) @@ -382,13 +383,13 @@ def match_legacy_ps_rb_status(self, fname, line, pid, rank): existing_op = self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["op"] if op != existing_op: self._warn(f"rb={puuid}/{ver}/{gen}/{existing_op} != line op {op}", fname, line) - if (status == "scanning"): + if status == "scanning": self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["scanning"] = True - elif (status == "pulling"): + elif status == "pulling": self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["pulling"] = True - elif (status == "completed"): + elif status == "completed": self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["completed"] = True - elif (status == "aborted"): + elif status == "aborted": self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["aborted"] = True self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["time"] = datetime self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["duration"] = dur @@ -399,11 +400,12 @@ def match_legacy_ps_rb_status(self, fname, line, pid, rank): return False def scan_file(self, log_iter, rank=-1): + """Scan a daos engine log file and insert important pool events into a nested dictionary""" fname = log_iter.fname # Find rank assignment log line for this file. Can't do much without it. self._file_to_rank[fname] = rank - if rank == -1 and not self.find_rank(log_iter): + if rank == -1 and not self._find_rank(log_iter): self._warn("cannot find rank assignment in log file - skipping", fname) return rank = self._file_to_rank[fname] @@ -412,38 +414,35 @@ def scan_file(self, log_iter, rank=-1): print(f"INFO: scanning file {fname} rank {rank}, PID {pid}") for line in log_iter.new_iter(pid=pid): # Find pool term begin (PS leader step_up) - if self.match_ps_step_up(fname, line, pid, rank): + if self._match_ps_step_up(fname, line, pid, rank): continue # Find pool term end (PS leader step_down) - if self.match_ps_step_down(fname, line, pid, rank): + if self._match_ps_step_down(fname, line, pid, rank): continue - # TODO: find target status updates (precursor to pool map updates) - # Find pool map updates - if self.match_ps_pmap_update(fname, line, pid, rank): + if self._match_ps_pmap_update(fname, line, pid, rank): continue # Find rebuild start by the PS leader - if self.match_ps_rb_start(fname, line, pid, rank): + if self._match_ps_rb_start(fname, line, pid, rank): continue - if self.match_legacy_ps_rb_start(fname, line, pid, rank): + if self._match_legacy_ps_rb_start(fname, line, pid, rank): continue - if self.match_ps_rb_status(fname, line, pid, rank): + if self._match_ps_rb_status(fname, line, pid, rank): continue - if self.match_legacy_ps_rb_status(fname, line, pid, rank): + if self._match_legacy_ps_rb_status(fname, line, pid, rank): continue - # TODO: look for scan/migrate activity on all pool engines, count and correlate - # to PS leader activity? - # TODO: For a PID that is killed, clear any associated cur_ldr_rank / cur_ldr_pid. + # Future: for a PID that is killed, clear any associated cur_ldr_rank / cur_ldr_pid. # At logfile end, it could be due to engine killed, or could just be log rotation. def print_pools(self): + """Print all pools important events found in a nested dictionary""" for puuid in self._pools: print(f"===== Pool {puuid}:") for term in self._pools[puuid]: @@ -458,7 +457,7 @@ def print_pools(self): # Print pool map updates that happened within the term for v in self._pools[puuid][term]["maps"]: - # TODO: print tgt state changes + # Future: print tgt state changes before corresponding map updates # Print map updates t = self._pools[puuid][term]["maps"][v]["time"] @@ -469,20 +468,15 @@ def print_pools(self): for g in self._pools[puuid][term]["maps"][v]["rb_gens"]: op = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["op"] dur = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["duration"] - strt = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["started"] - # TODO: scan_done, pull_done booleans scan = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["scanning"] pull = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["pulling"] - # line len comp = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["completed"] abrt = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["aborted"] - # line len st = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["start_time"] ut = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["time"] - if strt: - status = "started" # status is latest status reached for the given rebuild + status = "started" if abrt: status = "aborted" elif comp: @@ -503,6 +497,7 @@ def print_pools(self): print(" " * 18 + f"{puuid} END term {term}\trank {r}\t{h}\tPID {p}\t{f}") def sort(self): + """Sort the nested dictionary of pools by pool service term""" for puuid in self._pools: tmp = dict(sorted(self._pools[puuid].items())) self._pools[puuid] = tmp @@ -543,7 +538,7 @@ def run(): sys.exit(1) sp.scan_file(log_iter, rank=rank) - print(f"\n========== Pools Report ({len(sp._warnings)} warnings during scanning) ==========\n") + print(f"\n======== Pools Report ({len(sp.get_warnings())} warnings from scanning) ========\n") sp.sort() sp.print_pools() From 14ae197735b2e166d793e43dff4a4cf3749982d7 Mon Sep 17 00:00:00 2001 From: Kenneth Cain Date: Fri, 20 Dec 2024 16:22:23 -0500 Subject: [PATCH 6/8] pylint, flake8, rpmlint so so picky Signed-off-by: Kenneth Cain --- src/tests/ftest/cart/util/daos_sys_logscan.py | 21 ++++++++++--------- utils/rpms/daos.spec | 2 +- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/tests/ftest/cart/util/daos_sys_logscan.py b/src/tests/ftest/cart/util/daos_sys_logscan.py index 1734d5578d6..554113c77ca 100755 --- a/src/tests/ftest/cart/util/daos_sys_logscan.py +++ b/src/tests/ftest/cart/util/daos_sys_logscan.py @@ -12,6 +12,7 @@ import cart_logparse + class SysPools(): """Directory of Pools and Summary Activities Found in Engine Log Files""" @@ -107,7 +108,7 @@ def _find_rank(self, log_iter): return found # return log-message, hostname, and date/timestamp components of the line - def get_line_components(self, line): + def _get_line_components(self, line): return line.get_msg(), line.hostname, line.time_stamp # is this rank, pid the leader of the pool with uuid puuid? @@ -119,7 +120,7 @@ def _is_leader(self, puuid, rank, pid): return False def _match_ps_step_up(self, fname, line, pid, rank): - msg, host, datetime = self.get_line_components(line) + msg, host, datetime = self._get_line_components(line) match = self.re_step_up.match(msg) if match: puuid = match.group(1) @@ -158,7 +159,7 @@ def _match_ps_step_up(self, fname, line, pid, rank): return False def _match_ps_step_down(self, fname, line, pid, rank): - msg, host, datetime = self.get_line_components(line) + msg, host, datetime = self._get_line_components(line) match = self.re_step_down.match(msg) if match: puuid = match.group(1) @@ -176,7 +177,7 @@ def _match_ps_step_down(self, fname, line, pid, rank): return False def _match_ps_pmap_update(self, fname, line, pid, rank): - msg, host, datetime = self.get_line_components(line) + msg, host, datetime = self._get_line_components(line) match = self.re_pmap_update.match(msg) if match: puuid = match.group(1) @@ -205,7 +206,7 @@ def _match_ps_rb_start(self, fname, line, pid, rank): # Do not match on new rebuild log format if we found legacy format if not self._check_rb_new_fmt: return False - msg, host, datetime = self.get_line_components(line) + msg, host, datetime = self._get_line_components(line) match = self.re_rebuild_ldr_start.match(msg) if match: # Disable checking for legacy rebuild log format, to save execution time @@ -241,7 +242,7 @@ def _match_legacy_ps_rb_start(self, fname, line, pid, rank): # Do not match on legacy rebuild log format if we found new format if not self._check_rb_legacy_fmt: return False - msg, host, datetime = self.get_line_components(line) + msg, host, datetime = self._get_line_components(line) match = self.re_old_ldr_start.match(msg) if match: # Disable checking for new rebuild log format, to save execution time @@ -279,7 +280,7 @@ def _match_ps_rb_status(self, fname, line, pid, rank): # Do not match on new rebuild log format if we found legacy format if not self._check_rb_new_fmt: return False - msg, host, datetime = self.get_line_components(line) + msg, host, datetime = self._get_line_components(line) match = self.re_rebuild_ldr_status.match(msg) if match: # Disable checking for legacy rebuild log format, to save execution time @@ -337,7 +338,7 @@ def _match_legacy_ps_rb_status(self, fname, line, pid, rank): # Do not match on legacy rebuild log format if we found new format if not self._check_rb_legacy_fmt: return False - msg, host, datetime = self.get_line_components(line) + msg, host, datetime = self._get_line_components(line) match = self.re_old_ldr_status.match(msg) if match: # Disable checking for new rebuild log format, to save execution time @@ -443,7 +444,7 @@ def scan_file(self, log_iter, rank=-1): def print_pools(self): """Print all pools important events found in a nested dictionary""" - for puuid in self._pools: + for puuid in self._pools.keys(): print(f"===== Pool {puuid}:") for term in self._pools[puuid]: b = self._pools[puuid][term]["begin_time"] @@ -498,7 +499,7 @@ def print_pools(self): def sort(self): """Sort the nested dictionary of pools by pool service term""" - for puuid in self._pools: + for puuid in self._pools.keys(): tmp = dict(sorted(self._pools[puuid].items())) self._pools[puuid] = tmp # _pools[puuid][term]["maps"] should have been inserted in ascending order already? diff --git a/utils/rpms/daos.spec b/utils/rpms/daos.spec index e9de33b41c7..ec27450f471 100644 --- a/utils/rpms/daos.spec +++ b/utils/rpms/daos.spec @@ -366,7 +366,7 @@ install -m 644 utils/systemd/%{agent_svc_name} %{buildroot}/%{_unitdir} mkdir -p %{buildroot}/%{conf_dir}/certs/clients mv %{buildroot}/%{conf_dir}/bash_completion.d %{buildroot}/%{_sysconfdir} # fixup env-script-interpreters -sed -i -e '1s/env //' %{buildroot}{%{daoshome}/TESTING/ftest/{cart/cart_logtest,config_file_gen,launch,slurm_setup,tags,verify_perms}.py,%{_bindir}/daos_storage_estimator.py} +sed -i -e '1s/env //' %{buildroot}{%{daoshome}/TESTING/ftest/{cart/cart_logtest,cart/daos_sys_logscan,config_file_gen,launch,slurm_setup,tags,verify_perms}.py,%{_bindir}/daos_storage_estimator.py} # shouldn't have source files in a non-devel RPM rm -f %{buildroot}%{daoshome}/TESTING/ftest/cart/{test_linkage.cpp,utest_{hlc,portnumber,protocol,swim}.c,wrap_cmocka.h} From b202795dfed67704dfdf17a0f7f9cde3e4e0a1c6 Mon Sep 17 00:00:00 2001 From: Kenneth Cain Date: Fri, 20 Dec 2024 17:06:42 -0500 Subject: [PATCH 7/8] More linting fixes, more Pythonic? Who knows. Signed-off-by: Kenneth Cain --- src/tests/ftest/cart/util/daos_sys_logscan.py | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/src/tests/ftest/cart/util/daos_sys_logscan.py b/src/tests/ftest/cart/util/daos_sys_logscan.py index 554113c77ca..025d39957fd 100755 --- a/src/tests/ftest/cart/util/daos_sys_logscan.py +++ b/src/tests/ftest/cart/util/daos_sys_logscan.py @@ -444,37 +444,42 @@ def scan_file(self, log_iter, rank=-1): def print_pools(self): """Print all pools important events found in a nested dictionary""" - for puuid in self._pools.keys(): + + # pd (pool dictionary): pool UUID -> td + # td (term dictionary): term number -> "maps" md + # md (map dictionary): pool map version number -> "rb_gens" rd + # rd (rebuild dictionary): rebuild generation number -> rebuild operation details + for puuid, pd in self._pools.items(): print(f"===== Pool {puuid}:") - for term in self._pools[puuid]: - b = self._pools[puuid][term]["begin_time"] - e = self._pools[puuid][term]["end_time"] - r = self._pools[puuid][term]["rank"] - h = self._pools[puuid][term]["host"] - p = self._pools[puuid][term]["pid"] - f = self._pools[puuid][term]["logfile"] + for term, td in pd.items(): + b = td["begin_time"] + e = td["end_time"] + r = td["rank"] + h = td["host"] + p = td["pid"] + f = td["logfile"] # Print term begin print(f"{b} {puuid} BEGIN term {term}\trank {r}\t{h}\tPID {p}\t{f}") # Print pool map updates that happened within the term - for v in self._pools[puuid][term]["maps"]: + for v, md in td["maps"].items(): # Future: print tgt state changes before corresponding map updates # Print map updates - t = self._pools[puuid][term]["maps"][v]["time"] - from_ver = self._pools[puuid][term]["maps"][v]["from_ver"] + t = md["time"] + from_ver = md["from_ver"] print(f"{t} {puuid} MAPVER {from_ver}->{v}") # Print rebuilds - for g in self._pools[puuid][term]["maps"][v]["rb_gens"]: - op = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["op"] - dur = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["duration"] - scan = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["scanning"] - pull = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["pulling"] - comp = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["completed"] - abrt = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["aborted"] - st = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["start_time"] - ut = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["time"] + for g, rd in md["rb_gens"].items(): + op = rd["op"] + dur = rd["duration"] + scan = rd["scanning"] + pull = rd["pulling"] + comp = rd["completed"] + abrt = rd["aborted"] + st = rd["start_time"] + ut = rd["time"] # status is latest status reached for the given rebuild status = "started" @@ -499,9 +504,9 @@ def print_pools(self): def sort(self): """Sort the nested dictionary of pools by pool service term""" - for puuid in self._pools.keys(): - tmp = dict(sorted(self._pools[puuid].items())) - self._pools[puuid] = tmp + for puuid, pd in self._pools.items(): + tmp = dict(sorted(pd.items())) + pd = tmp # _pools[puuid][term]["maps"] should have been inserted in ascending order already? From 69dd9d86031b2c22569fc36e4dc275698bab19c0 Mon Sep 17 00:00:00 2001 From: Kenneth Cain Date: Sat, 21 Dec 2024 10:40:35 -0500 Subject: [PATCH 8/8] linting fixes Signed-off-by: Kenneth Cain --- src/tests/ftest/cart/util/daos_sys_logscan.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/ftest/cart/util/daos_sys_logscan.py b/src/tests/ftest/cart/util/daos_sys_logscan.py index 025d39957fd..dba6dce3d20 100755 --- a/src/tests/ftest/cart/util/daos_sys_logscan.py +++ b/src/tests/ftest/cart/util/daos_sys_logscan.py @@ -448,7 +448,7 @@ def print_pools(self): # pd (pool dictionary): pool UUID -> td # td (term dictionary): term number -> "maps" md # md (map dictionary): pool map version number -> "rb_gens" rd - # rd (rebuild dictionary): rebuild generation number -> rebuild operation details + # rd (rebuild dictionary): rebuild generation number -> rebuild operation details for puuid, pd in self._pools.items(): print(f"===== Pool {puuid}:") for term, td in pd.items(): @@ -506,7 +506,7 @@ def sort(self): """Sort the nested dictionary of pools by pool service term""" for puuid, pd in self._pools.items(): tmp = dict(sorted(pd.items())) - pd = tmp + self._pools[puuid] = tmp # _pools[puuid][term]["maps"] should have been inserted in ascending order already?