Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Kenneth Cain <[email protected]>
  • Loading branch information
kccain committed Dec 20, 2024
1 parent 5c0afdf commit f00b62c
Showing 1 changed file with 62 additions and 41 deletions.
103 changes: 62 additions & 41 deletions src/tests/ftest/cart/util/daos_sys_logscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SysPools():
rbid_re = r"rb=([0-9a-fA-F]{8})/(\d+)/(\d+)/(\w+)"

# Rebuild preliminary steps
# TODO/FIXME rebuild_task_ult() waiting for sched time, and map dist needs uniform rb= string.
# TODO/FIXME rebuild_task_ult() wait for scheduling, and map dist needs uniform rb= string.
# re.compile(r"rebuild_task_ult\(\).*rebuild task sleep (\d+) second")
# re.compile(r"rebuild_task_ult\(\).*map_dist_ver (\d+) map ver (\d+)")

Expand Down Expand Up @@ -75,6 +75,9 @@ def __init__(self):
self._file_to_rank = {}

self._warnings = []
self._check_rb_new_fmt = True
self._check_rb_legacy_fmt = True
self._debug = False

def _warn(self, wmsg, fname, line):
full_msg = f"WARN file={fname}"
Expand Down Expand Up @@ -144,9 +147,9 @@ def match_ps_step_up(self, fname, line, pid, rank):
"logfile": fname,
"maps": pmap_versions
}
# DEBUG
# print(f"{datetime} FOUND pool {puuid} BEGIN\tterm {term} pmap_versions empty: "
# f"{str(pmap_versions == {})} rank {rank}\t{host}\tPID {pid}\t{fname}")
if self._debug:
print(f"{datetime} FOUND pool {puuid} BEGIN\tterm {term} pmap_versions empty: "
f"{str(pmap_versions == {})} rank {rank}\t{host}\tPID {pid}\t{fname}")
return True
return False

Expand All @@ -162,9 +165,9 @@ def match_ps_step_down(self, fname, line, pid, rank):
self._cur_ldr_pid[puuid] = -1
self._cur_term[puuid] = -1
self._pools[puuid][term]["end_time"] = datetime
# DEBUG
# print(f"{datetime} FOUND pool {puuid} END\tterm {term} rank {rank}\t{host}\t"
# f"PID {pid}\t{fname}")
if self._debug:
print(f"{datetime} FOUND pool {puuid} END\tterm {term} rank {rank}\t{host}\t"
f"PID {pid}\t{fname}")
return True
return False

Expand All @@ -184,20 +187,25 @@ def match_ps_pmap_update(self, fname, line, pid, rank):
"from_ver": from_ver,
"time": datetime,
"rb_gens": {}
}
# DEBUG
#print(f"FOUND pool {puuid} map update {from_ver}->{to_ver} rank {rank}\t{host}\t"
# f"PID {pid}\t{fname}")
}
if self._debug:
print(f"FOUND pool {puuid} map update {from_ver}->{to_ver} rank {rank}\t{host}\t"
f"PID {pid}\t{fname}")
return True
return False

def get_rb_components(self, match):
return match.group(1), int(match.group(2)), int(match.group(3)), match.group(4)

def match_ps_rb_start(self, fname, line, pid, rank):
# Do not match on new rebuild log format if we found legacy format
if not self._check_rb_new_fmt:
return False
msg, host, datetime = self.get_line_components(line)
match = self.re_rebuild_ldr_start.match(msg)
if match:
# Disable checking for legacy rebuild log format, to save execution time
self._check_rb_legacy_fmt = False
puuid, ver, gen, op = self.get_rb_components(match)
if not self.is_leader(puuid, rank, pid):
return True
Expand All @@ -219,18 +227,22 @@ def match_ps_rb_start(self, fname, line, pid, rank):
"completed": False,
"aborted": False,
"duration": 0
}
# DEBUG
#print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{ver}/{gen}/{op} "
# f"rank {rank}\t{host}\tPID {pid}\t{fname}")
}
if self._debug:
print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{ver}/{gen}/{op} "
f"rank {rank}\t{host}\tPID {pid}\t{fname}")
return True
return False

def match_legacy_ps_rb_start(self, fname, line, pid, rank):
#old_ldr_start_re = r"rebuild_leader_start.*([0-9a-fA-F]{8}).*version=(\d+)/(\d+).*op=(\w+)"
# Do not match on legacy rebuild log format if we found new format
if not self._check_rb_legacy_fmt:
return False
msg, host, datetime = self.get_line_components(line)
match = self.re_old_ldr_start.match(msg)
if match:
# Disable checking for new rebuild log format, to save execution time
self._check_rb_new_fmt = False
puuid = match.group(1)
ver = int(match.group(2))
gen = int(match.group(3))
Expand All @@ -255,17 +267,22 @@ def match_legacy_ps_rb_start(self, fname, line, pid, rank):
"completed": False,
"aborted": False,
"duration": 0
}
# DEBUG
#print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{ver}/{gen}/{op} "
# f"rank {rank}\t{host}\tPID {pid}\t{fname}")
}
if self._debug:
print(f"{datetime} FOUND rebuild start in term {term}, rb={puuid}/{ver}/{gen}/{op} "
f"rank {rank}\t{host}\tPID {pid}\t{fname}")
return True
return False

def match_ps_rb_status(self, fname, line, pid, rank):
# Do not match on new rebuild log format if we found legacy format
if not self._check_rb_new_fmt:
return False
msg, host, datetime = self.get_line_components(line)
match = self.re_rebuild_ldr_status.match(msg)
if match:
# Disable checking for legacy rebuild log format, to save execution time
self._check_rb_legacy_fmt = False
puuid, ver, gen, op = self.get_rb_components(match)
status = match.group(5)
dur = int(match.group(6))
Expand All @@ -283,7 +300,7 @@ def match_ps_rb_status(self, fname, line, pid, rank):
"from_ver": ver,
"time": "xx/xx-xx:xx:xx.xx",
"rb_gens": {}
}
}
self._pools[puuid][term]["maps"][ver]["rb_gens"][gen] = {
"op": op,
"start_time": "xx/xx-xx:xx:xx.xx",
Expand All @@ -294,7 +311,7 @@ def match_ps_rb_status(self, fname, line, pid, rank):
"completed": False,
"aborted": False,
"duration": 0
}
}
if gen in self._pools[puuid][term]["maps"][ver]["rb_gens"]:
existing_op = self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["op"]
if op != existing_op:
Expand All @@ -309,16 +326,21 @@ def match_ps_rb_status(self, fname, line, pid, rank):
self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["aborted"] = True
self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["time"] = datetime
self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["duration"] = dur
# DEBUG
# print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{ver}/{gen}/{op} "
# f"STATUS={status}, DUR={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}")
if self._debug:
print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{ver}/{gen}/{op} "
f"STATUS={status}, DUR={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}")
return True
return False

def match_legacy_ps_rb_status(self, fname, line, pid, rank):
# Do not match on legacy rebuild log format if we found new format
if not self._check_rb_legacy_fmt:
return False
msg, host, datetime = self.get_line_components(line)
match = self.re_old_ldr_status.match(msg)
if match:
# Disable checking for new rebuild log format, to save execution time
self._check_rb_new_fmt = False
op = match.group(1)
status = match.group(2)
puuid = match.group(3)
Expand All @@ -344,7 +366,7 @@ def match_legacy_ps_rb_status(self, fname, line, pid, rank):
"from_ver": ver,
"time": "xx/xx-xx:xx:xx.xx",
"rb_gens": {}
}
}
self._pools[puuid][term]["maps"][ver]["rb_gens"][gen] = {
"op": op,
"start_time": "xx/xx-xx:xx:xx.xx",
Expand All @@ -355,7 +377,7 @@ def match_legacy_ps_rb_status(self, fname, line, pid, rank):
"completed": False,
"aborted": False,
"duration": 0
}
}
if gen in self._pools[puuid][term]["maps"][ver]["rb_gens"]:
existing_op = self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["op"]
if op != existing_op:
Expand All @@ -367,12 +389,12 @@ def match_legacy_ps_rb_status(self, fname, line, pid, rank):
elif (status == "completed"):

Check warning on line 389 in src/tests/ftest/cart/util/daos_sys_logscan.py

View workflow job for this annotation

GitHub Actions / Pylint check

superfluous-parens, Unnecessary parens after 'elif' keyword
self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["completed"] = True
elif (status == "aborted"):

Check warning on line 391 in src/tests/ftest/cart/util/daos_sys_logscan.py

View workflow job for this annotation

GitHub Actions / Pylint check

superfluous-parens, Unnecessary parens after 'elif' keyword
self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["aborted"] = True
self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["aborted"] = True
self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["time"] = datetime
self._pools[puuid][term]["maps"][ver]["rb_gens"][gen]["duration"] = dur
# DEBUG
#print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{ver}/{gen}/{op} "
# f"STATUS={status}, DUR={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}")
if self._debug:
print(f"{datetime} FOUND rebuild UPDATE term={term} rb={puuid}/{ver}/{gen}/{op} "
f"STATUS={status}, DUR={dur} seconds rank {rank}\t{host}\tPID {pid}\t{fname}")
return True
return False

Expand All @@ -382,17 +404,13 @@ def scan_file(self, log_iter, rank=-1):
# Find rank assignment log line for this file. Can't do much without it.
self._file_to_rank[fname] = rank
if rank == -1 and not self.find_rank(log_iter):
self._warn(f"cannot find rank assignment in log file - skipping", fname)
self._warn("cannot find rank assignment in log file - skipping", fname)

Check failure on line 407 in src/tests/ftest/cart/util/daos_sys_logscan.py

View workflow job for this annotation

GitHub Actions / Pylint check

no-value-for-parameter, No value for argument 'line' in method call
return
rank = self._file_to_rank[fname]

for pid in log_iter.get_pids():
print(f"INFO: scanning file {fname} rank {rank}, PID {pid}")
for line in log_iter.new_iter(pid=pid):
msg = line.get_msg()
host = line.hostname
datetime = line.time_stamp

# Find pool term begin (PS leader step_up)
if self.match_ps_step_up(fname, line, pid, rank):
continue
Expand Down Expand Up @@ -451,10 +469,8 @@ def print_pools(self):
for g in self._pools[puuid][term]["maps"][v]["rb_gens"]:
op = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["op"]
dur = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["duration"]
# line len
started = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["started"]
strt = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["started"]
# TODO: scan_done, pull_done booleans
# line len
scan = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["scanning"]
pull = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["pulling"]
# line len
Expand All @@ -463,10 +479,13 @@ def print_pools(self):
# line len
st = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["start_time"]
ut = self._pools[puuid][term]["maps"][v]["rb_gens"][g]["time"]
status = "started"
if strt:
status = "started"

# status is latest status reached for the given rebuild
if abrt:
status = "aborted"
if comp:
elif comp:
status = "completed"
elif pull:
status = "pulling"
Expand All @@ -489,6 +508,7 @@ def sort(self):
self._pools[puuid] = tmp
# _pools[puuid][term]["maps"] should have been inserted in ascending order already?


def run():
"""Scan a list of daos_engine logfiles"""
ap = argparse.ArgumentParser()
Expand Down Expand Up @@ -527,5 +547,6 @@ def run():
sp.sort()
sp.print_pools()


if __name__ == '__main__':
run()

0 comments on commit f00b62c

Please sign in to comment.