From edeafc14d87a5d839f4dba482efc7a54cca89592 Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Tue, 16 May 2023 12:37:40 +0100 Subject: [PATCH] DAOS-13066 test: Reduce concurrency in NLT (#12112) Reduce concurrency in NLT and adjust the daos_fs_fix test so that it doesn't fail. Signed-off-by: Ashley Pittman --- src/tests/ftest/cart/util/cart_logtest.py | 38 ++++--- utils/node_local_test.py | 127 ++++++++++------------ 2 files changed, 77 insertions(+), 88 deletions(-) diff --git a/src/tests/ftest/cart/util/cart_logtest.py b/src/tests/ftest/cart/util/cart_logtest.py index 744e70eee2f..fa99630b27e 100755 --- a/src/tests/ftest/cart/util/cart_logtest.py +++ b/src/tests/ftest/cart/util/cart_logtest.py @@ -205,11 +205,20 @@ def subtract(self, val): self.__val -= val -# pylint: disable=too-many-statements -# pylint: disable=too-many-locals +# During shutdown ERROR messages that end with these strings are not reported as errors. +SHUTDOWN_RC = ("DER_SHUTDOWN(-2017): 'Service should shut down'", + "DER_NOTLEADER(-2008): 'Not service leader'") + +# Functions that are never reported as errors. +IGNORED_FUNCTIONS = ('sched_watchdog_post', 'rdb_timerd') + + class LogTest(): """Log testing""" + # pylint: disable=too-many-statements + # pylint: disable=too-many-locals + def __init__(self, log_iter, quiet=False): self.quiet = quiet self._li = log_iter @@ -396,28 +405,23 @@ def _check_pid_from_log_file(self, pid, abort_on_warning, leak_wf, show_memleaks # -DER_NOMEM show = False elif line.rpc: - # Ignore the SWIM RPC opcode, as this often sends RPCs - # that fail during shutdown. + # Ignore the SWIM RPC opcode, as this often sends RPCs that fail during + # shutdown. if line.rpc_opcode == '0xfe000000': show = False - # Disable checking for a number of conditions, either - # because these errors/lines are badly formatted or because - # they're intermittent and we don't want noise in the test - # results. + # Disable checking for a number of conditions, either because these errors/lines + # are badly formatted or because they're intermittent and we don't want noise in + # the test results. if line.fac == 'external': show = False - elif show and server_shutdown and (line.get_msg().endswith( - "DER_SHUTDOWN(-2017): 'Service should shut down'") - or line.get_msg().endswith( - "DER_NOTLEADER(-2008): 'Not service leader'")): - show = False - elif show and line.function == 'rdb_stop': + elif show and server_shutdown and any(map(line.get_msg().endswith, + SHUTDOWN_RC)): show = False - elif show and line.function == 'sched_watchdog_post': + elif show and line.function in IGNORED_FUNCTIONS: show = False if show: - # Allow WARNING or ERROR messages, but anything higher - # like assert should trigger a failure. + # Allow WARNING or ERROR messages, but anything higher like assert should + # trigger a failure. if line.level < cart_logparse.LOG_LEVELS['ERR']: show_line(line, 'HIGH', 'error in strict mode') else: diff --git a/utils/node_local_test.py b/utils/node_local_test.py index 4ab4411ee41..6a201b02307 100755 --- a/utils/node_local_test.py +++ b/utils/node_local_test.py @@ -994,43 +994,6 @@ def get_test_pool_obj(self): return self.test_pool -def il_cmd(dfuse, cmd, check_read=True, check_write=True, check_fstat=True): - """Run a command under the interception library - - Do not run valgrind here, not because it's not useful - but the options needed are different. Valgrind handles - linking differently so some memory is wrongly lost that - would be freed in the _fini() function, and a lot of - commands do not free all memory anyway. - """ - my_env = get_base_env() - prefix = f'dnt_dfuse_il_{get_inc_id()}_' - with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.log', delete=False) as log_file: - log_name = log_file.name - my_env['D_LOG_FILE'] = log_name - my_env['LD_PRELOAD'] = join(dfuse.conf['PREFIX'], 'lib64', 'libioil.so') - # pylint: disable=protected-access - my_env['DAOS_AGENT_DRPC_DIR'] = dfuse._daos.agent_dir - my_env['D_IL_REPORT'] = '2' - ret = subprocess.run(cmd, env=my_env, check=False) - print(f'Logged il to {log_name}') - print(ret) - - if dfuse.caching: - check_fstat = False - - try: - log_test(dfuse.conf, log_name, check_read=check_read, check_write=check_write, - check_fstat=check_fstat) - assert ret.returncode == 0 - except NLTestNoFunction as error: - command = ' '.join(cmd) - print(f"ERROR: command '{command}' did not log via {error.function}") - ret.returncode = 1 - - return ret - - class ValgrindHelper(): """Class for running valgrind commands @@ -1305,6 +1268,42 @@ def wait_for_exit(self): self.valgrind.convert_xml() os.rmdir(self.dir) + def il_cmd(self, cmd, check_read=True, check_write=True, check_fstat=True): + """Run a command under the interception library + + Do not run valgrind here, not because it's not useful + but the options needed are different. Valgrind handles + linking differently so some memory is wrongly lost that + would be freed in the _fini() function, and a lot of + commands do not free all memory anyway. + """ + my_env = get_base_env() + prefix = f'dnt_dfuse_il_{get_inc_id()}_' + with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.log', delete=False) as log_file: + log_name = log_file.name + my_env['D_LOG_FILE'] = log_name + my_env['LD_PRELOAD'] = join(self.conf['PREFIX'], 'lib64', 'libioil.so') + my_env['DAOS_AGENT_DRPC_DIR'] = self.conf.agent_dir + my_env['D_IL_REPORT'] = '2' + ret = subprocess.run(cmd, env=my_env, check=False) + print(f'Logged il to {log_name}') + print(ret) + + if self.caching: + check_fstat = False + + try: + log_test(self.conf, log_name, check_read=check_read, check_write=check_write, + check_fstat=check_fstat) + assert ret.returncode == 0 + except NLTestNoFunction as error: + command = ' '.join(cmd) + print(f"ERROR: command '{command}' did not log via {error.function}") + ret.returncode = 1 + + assert ret.returncode == 0, ret + return ret + def assert_file_size_fd(fd, size): """Verify the file size is as expected""" @@ -2189,11 +2188,7 @@ def test_il_cat(self): if self.dfuse.caching: check_fstat = False - rc = il_cmd(self.dfuse, - ['cat', fname], - check_write=False, - check_fstat=check_fstat) - assert rc.returncode == 0 + self.dfuse.il_cmd(['cat', fname], check_write=False, check_fstat=check_fstat) @needs_dfuse_with_opt(caching=False) def test_il(self): @@ -2211,30 +2206,24 @@ def test_il(self): with open(file, 'w') as fd: fd.write('Hello') # Copy it across containers. - ret = il_cmd(self.dfuse, ['cp', file, sub_cont_dir]) - assert ret.returncode == 0 + self.dfuse.il_cmd(['cp', file, sub_cont_dir]) # Copy it within the container. child_dir = join(self.dfuse.dir, 'new_dir') os.mkdir(child_dir) - il_cmd(self.dfuse, ['cp', file, child_dir]) - assert ret.returncode == 0 - + self.dfuse.il_cmd(['cp', file, child_dir]) # Copy something into a container - ret = il_cmd(self.dfuse, ['cp', '/bin/bash', sub_cont_dir], check_read=False) - assert ret.returncode == 0 + self.dfuse.il_cmd(['cp', '/bin/bash', sub_cont_dir], check_read=False) # Read it from within a container - ret = il_cmd(self.dfuse, ['md5sum', join(sub_cont_dir, 'bash')], - check_read=False, check_write=False, check_fstat=False) - assert ret.returncode == 0 - ret = il_cmd(self.dfuse, ['dd', - f'if={join(sub_cont_dir, "bash")}', - f'of={join(sub_cont_dir, "bash_copy")}', - 'iflag=direct', - 'oflag=direct', - 'bs=128k'], - check_fstat=False) - assert ret.returncode == 0 + self.dfuse.il_cmd(['md5sum', join(sub_cont_dir, 'bash')], + check_read=False, check_write=False, check_fstat=False) + self.dfuse.il_cmd(['dd', + f'if={join(sub_cont_dir, "bash")}', + f'of={join(sub_cont_dir, "bash_copy")}', + 'iflag=direct', + 'oflag=direct', + 'bs=128k'], + check_fstat=False) @needs_dfuse def test_xattr(self): @@ -3486,7 +3475,7 @@ def test_daos_fs_fix(self): os.mkdir(dirname) fname1 = join(dirname, 'f1') - with open(fname1, 'w') as fd: + with open(fname1, 'w', encoding='ascii') as fd: fd.write('test1') fname2 = join(dirname, 'f2') with open(fname2, 'w') as fd: @@ -3495,7 +3484,7 @@ def test_daos_fs_fix(self): dirname1 = join(path, 'test_dir/1d1/') os.mkdir(dirname1) fname3 = join(dirname1, 'f3') - with open(fname3, 'w') as fd: + with open(fname3, 'w', encoding='ascii') as fd: fd.write('test3') dirname2 = join(path, 'test_dir/1d2/') os.mkdir(dirname2) @@ -3610,9 +3599,8 @@ def test_daos_fs_fix(self): print(f'rc is {rc}') output = rc.stdout.decode('utf-8') assert check_dfs_tool_output(output, None, '1048576') - with open(fname1, 'rb') as fd: + with open(fname1, 'r', encoding='ascii', errors='ignore') as fd: data = fd.read() - data = data.decode('utf-8-sig').strip() if data != 'test1': raise NLTestFail('/test_dir/f1 data is corrupted') @@ -3622,9 +3610,8 @@ def test_daos_fs_fix(self): print(f'rc is {rc}') output = rc.stdout.decode('utf-8') assert check_dfs_tool_output(output, None, '1048576') - with open(fname3, 'rb') as fd: + with open(fname3, 'r', encoding='ascii', errors='ignore') as fd: data = fd.read() - data = data.decode('utf-8-sig').strip() if data != 'test3': raise NLTestFail('/test_dir/1d1/f3 data is corrupted') @@ -3812,7 +3799,7 @@ def _run_test(ptl=None, function=None, test_cb=None): # long-running tests which dominate the time, so whilst a higher value here would # work there's no benefit in rushing to finish the quicker tests. The long-running # tests are started first. - while len(threads) > 5: + while len(threads) > 4: for thread_id in threads: thread_id.join(timeout=0) if thread_id.is_alive(): @@ -3865,8 +3852,7 @@ def run_tests(dfuse): assert_file_size(ofd, 21) print(os.fstat(ofd.fileno())) ofd.close() - ret = il_cmd(dfuse, ['cat', fname], check_write=False) - assert ret.returncode == 0 + dfuse.il_cmd(['cat', fname], check_write=False) ofd = os.open(fname, os.O_TRUNC) assert_file_size_fd(ofd, 0) os.close(ofd) @@ -4104,8 +4090,7 @@ def create_and_read_via_il(dfuse, path): ofd.flush() assert_file_size(ofd, 12) print(os.fstat(ofd.fileno())) - ret = il_cmd(dfuse, ['cat', fname], check_write=False) - assert ret.returncode == 0 + dfuse.il_cmd(['cat', fname], check_write=False) def run_container_query(conf, path):