Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-13066 test: Reduce concurrency in NLT #12112

Merged
merged 9 commits into from
May 16, 2023
38 changes: 21 additions & 17 deletions src/tests/ftest/cart/util/cart_logtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,20 @@ def subtract(self, val):
self.__val -= val


# pylint: disable=too-many-statements
# pylint: disable=too-many-locals
# During shutdown ERROR messages that end with these strings are not reported as errors.
SHUTDOWN_RC = ("DER_SHUTDOWN(-2017): 'Service should shut down'",
"DER_NOTLEADER(-2008): 'Not service leader'")

# Functions that are never reported as errors.
IGNORED_FUNCTIONS = ('rdb_stop', 'sched_watchdog_post', 'rdb_timerd')


class LogTest():
"""Log testing"""

# pylint: disable=too-many-statements
# pylint: disable=too-many-locals

def __init__(self, log_iter, quiet=False):
self.quiet = quiet
self._li = log_iter
Expand Down Expand Up @@ -396,28 +405,23 @@ def _check_pid_from_log_file(self, pid, abort_on_warning, leak_wf, show_memleaks
# -DER_NOMEM
show = False
elif line.rpc:
# Ignore the SWIM RPC opcode, as this often sends RPCs
# that fail during shutdown.
# Ignore the SWIM RPC opcode, as this often sends RPCs that fail during
# shutdown.
if line.rpc_opcode == '0xfe000000':
show = False
# Disable checking for a number of conditions, either
# because these errors/lines are badly formatted or because
# they're intermittent and we don't want noise in the test
# results.
# Disable checking for a number of conditions, either because these errors/lines
# are badly formatted or because they're intermittent and we don't want noise in
# the test results.
if line.fac == 'external':
show = False
elif show and server_shutdown and (line.get_msg().endswith(
"DER_SHUTDOWN(-2017): 'Service should shut down'")
or line.get_msg().endswith(
"DER_NOTLEADER(-2008): 'Not service leader'")):
show = False
elif show and line.function == 'rdb_stop':
elif show and server_shutdown and any(map(line.get_msg().endswith,
SHUTDOWN_RC)):
daltonbohning marked this conversation as resolved.
Show resolved Hide resolved
show = False
elif show and line.function == 'sched_watchdog_post':
elif show and line.function in IGNORED_FUNCTIONS:
show = False
if show:
# Allow WARNING or ERROR messages, but anything higher
# like assert should trigger a failure.
# Allow WARNING or ERROR messages, but anything higher like assert should
# trigger a failure.
if line.level < cart_logparse.LOG_LEVELS['ERR']:
show_line(line, 'HIGH', 'error in strict mode')
else:
Expand Down
117 changes: 52 additions & 65 deletions utils/node_local_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,43 +994,6 @@ def get_test_pool_obj(self):
return self.test_pool


def il_cmd(dfuse, cmd, check_read=True, check_write=True, check_fstat=True):
"""Run a command under the interception library

Do not run valgrind here, not because it's not useful
but the options needed are different. Valgrind handles
linking differently so some memory is wrongly lost that
would be freed in the _fini() function, and a lot of
commands do not free all memory anyway.
"""
my_env = get_base_env()
prefix = f'dnt_dfuse_il_{get_inc_id()}_'
with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.log', delete=False) as log_file:
log_name = log_file.name
my_env['D_LOG_FILE'] = log_name
my_env['LD_PRELOAD'] = join(dfuse.conf['PREFIX'], 'lib64', 'libioil.so')
# pylint: disable=protected-access
my_env['DAOS_AGENT_DRPC_DIR'] = dfuse._daos.agent_dir
my_env['D_IL_REPORT'] = '2'
ret = subprocess.run(cmd, env=my_env, check=False)
print(f'Logged il to {log_name}')
print(ret)

if dfuse.caching:
check_fstat = False

try:
log_test(dfuse.conf, log_name, check_read=check_read, check_write=check_write,
check_fstat=check_fstat)
assert ret.returncode == 0
except NLTestNoFunction as error:
command = ' '.join(cmd)
print(f"ERROR: command '{command}' did not log via {error.function}")
ret.returncode = 1

return ret


class ValgrindHelper():
"""Class for running valgrind commands

Expand Down Expand Up @@ -1305,6 +1268,42 @@ def wait_for_exit(self):
self.valgrind.convert_xml()
os.rmdir(self.dir)

def il_cmd(self, cmd, check_read=True, check_write=True, check_fstat=True):
"""Run a command under the interception library

Do not run valgrind here, not because it's not useful
but the options needed are different. Valgrind handles
linking differently so some memory is wrongly lost that
would be freed in the _fini() function, and a lot of
commands do not free all memory anyway.
"""
my_env = get_base_env()
prefix = f'dnt_dfuse_il_{get_inc_id()}_'
with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.log', delete=False) as log_file:
log_name = log_file.name
my_env['D_LOG_FILE'] = log_name
my_env['LD_PRELOAD'] = join(self.conf['PREFIX'], 'lib64', 'libioil.so')
my_env['DAOS_AGENT_DRPC_DIR'] = self.conf.agent_dir
my_env['D_IL_REPORT'] = '2'
ret = subprocess.run(cmd, env=my_env, check=False)
print(f'Logged il to {log_name}')
print(ret)

if self.caching:
check_fstat = False

try:
log_test(self.conf, log_name, check_read=check_read, check_write=check_write,
check_fstat=check_fstat)
assert ret.returncode == 0
except NLTestNoFunction as error:
command = ' '.join(cmd)
print(f"ERROR: command '{command}' did not log via {error.function}")
ret.returncode = 1

assert ret.returncode == 0, ret
return ret


def assert_file_size_fd(fd, size):
"""Verify the file size is as expected"""
Expand Down Expand Up @@ -2189,11 +2188,7 @@ def test_il_cat(self):
if self.dfuse.caching:
check_fstat = False

rc = il_cmd(self.dfuse,
['cat', fname],
check_write=False,
check_fstat=check_fstat)
assert rc.returncode == 0
self.dfuse.il_cmd(['cat', fname], check_write=False, check_fstat=check_fstat)

@needs_dfuse_with_opt(caching=False)
def test_il(self):
Expand All @@ -2211,30 +2206,24 @@ def test_il(self):
with open(file, 'w') as fd:
fd.write('Hello')
# Copy it across containers.
ret = il_cmd(self.dfuse, ['cp', file, sub_cont_dir])
assert ret.returncode == 0
self.dfuse.il_cmd(['cp', file, sub_cont_dir])

# Copy it within the container.
child_dir = join(self.dfuse.dir, 'new_dir')
os.mkdir(child_dir)
il_cmd(self.dfuse, ['cp', file, child_dir])
assert ret.returncode == 0

self.dfuse.il_cmd(['cp', file, child_dir])
# Copy something into a container
ret = il_cmd(self.dfuse, ['cp', '/bin/bash', sub_cont_dir], check_read=False)
assert ret.returncode == 0
self.dfuse.il_cmd(['cp', '/bin/bash', sub_cont_dir], check_read=False)
# Read it from within a container
ret = il_cmd(self.dfuse, ['md5sum', join(sub_cont_dir, 'bash')],
check_read=False, check_write=False, check_fstat=False)
assert ret.returncode == 0
ret = il_cmd(self.dfuse, ['dd',
f'if={join(sub_cont_dir, "bash")}',
f'of={join(sub_cont_dir, "bash_copy")}',
'iflag=direct',
'oflag=direct',
'bs=128k'],
check_fstat=False)
assert ret.returncode == 0
self.dfuse.il_cmd(['md5sum', join(sub_cont_dir, 'bash')],
check_read=False, check_write=False, check_fstat=False)
self.dfuse.il_cmd(['dd',
f'if={join(sub_cont_dir, "bash")}',
f'of={join(sub_cont_dir, "bash_copy")}',
'iflag=direct',
'oflag=direct',
'bs=128k'],
check_fstat=False)

@needs_dfuse
def test_xattr(self):
Expand Down Expand Up @@ -3812,7 +3801,7 @@ def _run_test(ptl=None, function=None, test_cb=None):
# long-running tests which dominate the time, so whilst a higher value here would
# work there's no benefit in rushing to finish the quicker tests. The long-running
# tests are started first.
while len(threads) > 5:
while len(threads) > 4:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should 4 be a global or class constant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or a command line variable? I think here is as good as anywhere.

for thread_id in threads:
thread_id.join(timeout=0)
if thread_id.is_alive():
Expand Down Expand Up @@ -3865,8 +3854,7 @@ def run_tests(dfuse):
assert_file_size(ofd, 21)
print(os.fstat(ofd.fileno()))
ofd.close()
ret = il_cmd(dfuse, ['cat', fname], check_write=False)
assert ret.returncode == 0
dfuse.il_cmd(['cat', fname], check_write=False)
ofd = os.open(fname, os.O_TRUNC)
assert_file_size_fd(ofd, 0)
os.close(ofd)
Expand Down Expand Up @@ -4104,8 +4092,7 @@ def create_and_read_via_il(dfuse, path):
ofd.flush()
assert_file_size(ofd, 12)
print(os.fstat(ofd.fileno()))
ret = il_cmd(dfuse, ['cat', fname], check_write=False)
assert ret.returncode == 0
dfuse.il_cmd(['cat', fname], check_write=False)


def run_container_query(conf, path):
Expand Down