Skip to content

Commit

Permalink
DAOS-13066 test: Reduce concurrency in NLT (#12112)
Browse files Browse the repository at this point in the history
Reduce concurrency in NLT and adjust the daos_fs_fix test so that it doesn't fail.

Signed-off-by: Ashley Pittman <[email protected]>
  • Loading branch information
ashleypittman authored May 16, 2023
1 parent 97de9cf commit edeafc1
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 88 deletions.
38 changes: 21 additions & 17 deletions src/tests/ftest/cart/util/cart_logtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,20 @@ def subtract(self, val):
self.__val -= val


# pylint: disable=too-many-statements
# pylint: disable=too-many-locals
# During shutdown ERROR messages that end with these strings are not reported as errors.
SHUTDOWN_RC = ("DER_SHUTDOWN(-2017): 'Service should shut down'",
"DER_NOTLEADER(-2008): 'Not service leader'")

# Functions that are never reported as errors.
IGNORED_FUNCTIONS = ('sched_watchdog_post', 'rdb_timerd')


class LogTest():
"""Log testing"""

# pylint: disable=too-many-statements
# pylint: disable=too-many-locals

def __init__(self, log_iter, quiet=False):
self.quiet = quiet
self._li = log_iter
Expand Down Expand Up @@ -396,28 +405,23 @@ def _check_pid_from_log_file(self, pid, abort_on_warning, leak_wf, show_memleaks
# -DER_NOMEM
show = False
elif line.rpc:
# Ignore the SWIM RPC opcode, as this often sends RPCs
# that fail during shutdown.
# Ignore the SWIM RPC opcode, as this often sends RPCs that fail during
# shutdown.
if line.rpc_opcode == '0xfe000000':
show = False
# Disable checking for a number of conditions, either
# because these errors/lines are badly formatted or because
# they're intermittent and we don't want noise in the test
# results.
# Disable checking for a number of conditions, either because these errors/lines
# are badly formatted or because they're intermittent and we don't want noise in
# the test results.
if line.fac == 'external':
show = False
elif show and server_shutdown and (line.get_msg().endswith(
"DER_SHUTDOWN(-2017): 'Service should shut down'")
or line.get_msg().endswith(
"DER_NOTLEADER(-2008): 'Not service leader'")):
show = False
elif show and line.function == 'rdb_stop':
elif show and server_shutdown and any(map(line.get_msg().endswith,
SHUTDOWN_RC)):
show = False
elif show and line.function == 'sched_watchdog_post':
elif show and line.function in IGNORED_FUNCTIONS:
show = False
if show:
# Allow WARNING or ERROR messages, but anything higher
# like assert should trigger a failure.
# Allow WARNING or ERROR messages, but anything higher like assert should
# trigger a failure.
if line.level < cart_logparse.LOG_LEVELS['ERR']:
show_line(line, 'HIGH', 'error in strict mode')
else:
Expand Down
127 changes: 56 additions & 71 deletions utils/node_local_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,43 +994,6 @@ def get_test_pool_obj(self):
return self.test_pool


def il_cmd(dfuse, cmd, check_read=True, check_write=True, check_fstat=True):
"""Run a command under the interception library
Do not run valgrind here, not because it's not useful
but the options needed are different. Valgrind handles
linking differently so some memory is wrongly lost that
would be freed in the _fini() function, and a lot of
commands do not free all memory anyway.
"""
my_env = get_base_env()
prefix = f'dnt_dfuse_il_{get_inc_id()}_'
with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.log', delete=False) as log_file:
log_name = log_file.name
my_env['D_LOG_FILE'] = log_name
my_env['LD_PRELOAD'] = join(dfuse.conf['PREFIX'], 'lib64', 'libioil.so')
# pylint: disable=protected-access
my_env['DAOS_AGENT_DRPC_DIR'] = dfuse._daos.agent_dir
my_env['D_IL_REPORT'] = '2'
ret = subprocess.run(cmd, env=my_env, check=False)
print(f'Logged il to {log_name}')
print(ret)

if dfuse.caching:
check_fstat = False

try:
log_test(dfuse.conf, log_name, check_read=check_read, check_write=check_write,
check_fstat=check_fstat)
assert ret.returncode == 0
except NLTestNoFunction as error:
command = ' '.join(cmd)
print(f"ERROR: command '{command}' did not log via {error.function}")
ret.returncode = 1

return ret


class ValgrindHelper():
"""Class for running valgrind commands
Expand Down Expand Up @@ -1305,6 +1268,42 @@ def wait_for_exit(self):
self.valgrind.convert_xml()
os.rmdir(self.dir)

def il_cmd(self, cmd, check_read=True, check_write=True, check_fstat=True):
"""Run a command under the interception library
Do not run valgrind here, not because it's not useful
but the options needed are different. Valgrind handles
linking differently so some memory is wrongly lost that
would be freed in the _fini() function, and a lot of
commands do not free all memory anyway.
"""
my_env = get_base_env()
prefix = f'dnt_dfuse_il_{get_inc_id()}_'
with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.log', delete=False) as log_file:
log_name = log_file.name
my_env['D_LOG_FILE'] = log_name
my_env['LD_PRELOAD'] = join(self.conf['PREFIX'], 'lib64', 'libioil.so')
my_env['DAOS_AGENT_DRPC_DIR'] = self.conf.agent_dir
my_env['D_IL_REPORT'] = '2'
ret = subprocess.run(cmd, env=my_env, check=False)
print(f'Logged il to {log_name}')
print(ret)

if self.caching:
check_fstat = False

try:
log_test(self.conf, log_name, check_read=check_read, check_write=check_write,
check_fstat=check_fstat)
assert ret.returncode == 0
except NLTestNoFunction as error:
command = ' '.join(cmd)
print(f"ERROR: command '{command}' did not log via {error.function}")
ret.returncode = 1

assert ret.returncode == 0, ret
return ret


def assert_file_size_fd(fd, size):
"""Verify the file size is as expected"""
Expand Down Expand Up @@ -2189,11 +2188,7 @@ def test_il_cat(self):
if self.dfuse.caching:
check_fstat = False

rc = il_cmd(self.dfuse,
['cat', fname],
check_write=False,
check_fstat=check_fstat)
assert rc.returncode == 0
self.dfuse.il_cmd(['cat', fname], check_write=False, check_fstat=check_fstat)

@needs_dfuse_with_opt(caching=False)
def test_il(self):
Expand All @@ -2211,30 +2206,24 @@ def test_il(self):
with open(file, 'w') as fd:
fd.write('Hello')
# Copy it across containers.
ret = il_cmd(self.dfuse, ['cp', file, sub_cont_dir])
assert ret.returncode == 0
self.dfuse.il_cmd(['cp', file, sub_cont_dir])

# Copy it within the container.
child_dir = join(self.dfuse.dir, 'new_dir')
os.mkdir(child_dir)
il_cmd(self.dfuse, ['cp', file, child_dir])
assert ret.returncode == 0

self.dfuse.il_cmd(['cp', file, child_dir])
# Copy something into a container
ret = il_cmd(self.dfuse, ['cp', '/bin/bash', sub_cont_dir], check_read=False)
assert ret.returncode == 0
self.dfuse.il_cmd(['cp', '/bin/bash', sub_cont_dir], check_read=False)
# Read it from within a container
ret = il_cmd(self.dfuse, ['md5sum', join(sub_cont_dir, 'bash')],
check_read=False, check_write=False, check_fstat=False)
assert ret.returncode == 0
ret = il_cmd(self.dfuse, ['dd',
f'if={join(sub_cont_dir, "bash")}',
f'of={join(sub_cont_dir, "bash_copy")}',
'iflag=direct',
'oflag=direct',
'bs=128k'],
check_fstat=False)
assert ret.returncode == 0
self.dfuse.il_cmd(['md5sum', join(sub_cont_dir, 'bash')],
check_read=False, check_write=False, check_fstat=False)
self.dfuse.il_cmd(['dd',
f'if={join(sub_cont_dir, "bash")}',
f'of={join(sub_cont_dir, "bash_copy")}',
'iflag=direct',
'oflag=direct',
'bs=128k'],
check_fstat=False)

@needs_dfuse
def test_xattr(self):
Expand Down Expand Up @@ -3486,7 +3475,7 @@ def test_daos_fs_fix(self):
os.mkdir(dirname)

fname1 = join(dirname, 'f1')
with open(fname1, 'w') as fd:
with open(fname1, 'w', encoding='ascii') as fd:
fd.write('test1')
fname2 = join(dirname, 'f2')
with open(fname2, 'w') as fd:
Expand All @@ -3495,7 +3484,7 @@ def test_daos_fs_fix(self):
dirname1 = join(path, 'test_dir/1d1/')
os.mkdir(dirname1)
fname3 = join(dirname1, 'f3')
with open(fname3, 'w') as fd:
with open(fname3, 'w', encoding='ascii') as fd:
fd.write('test3')
dirname2 = join(path, 'test_dir/1d2/')
os.mkdir(dirname2)
Expand Down Expand Up @@ -3610,9 +3599,8 @@ def test_daos_fs_fix(self):
print(f'rc is {rc}')
output = rc.stdout.decode('utf-8')
assert check_dfs_tool_output(output, None, '1048576')
with open(fname1, 'rb') as fd:
with open(fname1, 'r', encoding='ascii', errors='ignore') as fd:
data = fd.read()
data = data.decode('utf-8-sig').strip()
if data != 'test1':
raise NLTestFail('/test_dir/f1 data is corrupted')

Expand All @@ -3622,9 +3610,8 @@ def test_daos_fs_fix(self):
print(f'rc is {rc}')
output = rc.stdout.decode('utf-8')
assert check_dfs_tool_output(output, None, '1048576')
with open(fname3, 'rb') as fd:
with open(fname3, 'r', encoding='ascii', errors='ignore') as fd:
data = fd.read()
data = data.decode('utf-8-sig').strip()
if data != 'test3':
raise NLTestFail('/test_dir/1d1/f3 data is corrupted')

Expand Down Expand Up @@ -3812,7 +3799,7 @@ def _run_test(ptl=None, function=None, test_cb=None):
# long-running tests which dominate the time, so whilst a higher value here would
# work there's no benefit in rushing to finish the quicker tests. The long-running
# tests are started first.
while len(threads) > 5:
while len(threads) > 4:
for thread_id in threads:
thread_id.join(timeout=0)
if thread_id.is_alive():
Expand Down Expand Up @@ -3865,8 +3852,7 @@ def run_tests(dfuse):
assert_file_size(ofd, 21)
print(os.fstat(ofd.fileno()))
ofd.close()
ret = il_cmd(dfuse, ['cat', fname], check_write=False)
assert ret.returncode == 0
dfuse.il_cmd(['cat', fname], check_write=False)
ofd = os.open(fname, os.O_TRUNC)
assert_file_size_fd(ofd, 0)
os.close(ofd)
Expand Down Expand Up @@ -4104,8 +4090,7 @@ def create_and_read_via_il(dfuse, path):
ofd.flush()
assert_file_size(ofd, 12)
print(os.fstat(ofd.fileno()))
ret = il_cmd(dfuse, ['cat', fname], check_write=False)
assert ret.returncode == 0
dfuse.il_cmd(['cat', fname], check_write=False)


def run_container_query(conf, path):
Expand Down

0 comments on commit edeafc1

Please sign in to comment.