Skip to content

Commit

Permalink
Fix timeout and abort / Pipelne updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Blanca-Fuentes committed Jan 23, 2025
1 parent 08f4524 commit f8b5e9e
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 48 deletions.
21 changes: 5 additions & 16 deletions reframe/frontend/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,28 +555,17 @@ def abort(self, cause=None):
exc.__cause__ = cause
try:
# The abort can also happen during a compile job
if self.check.build_job:
self.check.build_job.cancel()
if not self.zombie and (self.check.job or self.check.build_job):
if self.check.build_job:
self.check.build_job.cancel()
else:
self.check.job.cancel()
except JobNotStartedError:
self.fail((type(exc), exc, None), 'on_task_abort')
except BaseException:
self.fail()
else:
self.fail((type(exc), exc, None), 'on_task_abort')

try:
if not self.zombie and self.check.job:
self.check.job.cancel()
except JobNotStartedError:
if not self.failed:
self.fail((type(exc), exc, None), 'on_task_abort')
except BaseException:
if not self.failed:
self.fail()
else:
if not self.failed:
self.fail((type(exc), exc, None), 'on_task_abort')

self._aborted = True

def info(self):
Expand Down
43 changes: 41 additions & 2 deletions reframe/frontend/executors/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,9 @@ def __init__(self):
self._max_jobs = {
'_rfm_local': rt.runtime().get_option('systems/0/max_local_jobs')
}

self._pipeline_statistics = rt.runtime().get_option(
'general/0/dump_pipeline_progress'
)
# Tasks that have finished, but have not performed their cleanup phase
self._retired_tasks = []
self.task_listeners.append(self)
Expand Down Expand Up @@ -424,13 +426,18 @@ async def _runcase(self, case, task):
self._max_jobs.setdefault(partition.fullname, partition.max_jobs)

self._task_index[case] = task

if self._pipeline_statistics:
self._init_pipeline_progress(len(self._current_tasks))

try:
# Do not run test if any of its dependencies has failed
# NOTE: Restored dependencies are not in the task_index
if any(self._task_index[c].failed
for c in case.deps if c in self._task_index):
raise TaskDependencyError('dependencies failed')


if any(self._task_index[c].skipped
for c in case.deps if c in self._task_index):

Expand Down Expand Up @@ -465,12 +472,19 @@ async def _runcase(self, case, task):
task.testcase.environ,
sched_flex_alloc_nodes=self.sched_flex_alloc_nodes,
sched_options=self.sched_options)

if self._pipeline_statistics:
self._update_pipeline_progress('startup', 'ready_compile', 1)

partname = _get_partition_name(task, phase='build')
max_jobs = self._max_jobs[partname]
while len(self._partition_tasks[partname])+1 > max_jobs:
await asyncio.sleep(2)
self._partition_tasks[partname].add(task)
await task.compile()
if self._pipeline_statistics:
self._update_pipeline_progress('ready_compile', 'compiling', 1)

# If RunOnly, no polling for run jobs
if task.check.build_job:
# Pick the right scheduler
Expand All @@ -494,20 +508,25 @@ async def _runcase(self, case, task):
if task.compile_complete():
break
else:
# yield control to another task to give it the chance to check their status
await asyncio.sleep(0)
# We need to check the timeout inside the while loop
if self.timeout_expired():
raise RunSessionTimeout(
'maximum session duration exceeded'
)
await task.compile_wait()
if self._pipeline_statistics:
self._update_pipeline_progress('compiling', 'ready_run', 1)
self._partition_tasks[partname].remove(task)
partname = _get_partition_name(task, phase='run')
max_jobs = self._max_jobs[partname]
while len(self._partition_tasks[partname])+1 > max_jobs:
await asyncio.sleep(2)
self._partition_tasks[partname].add(task)
await task.run()
if self._pipeline_statistics:
self._update_pipeline_progress('ready_run', 'running', 1)
# If CompileOnly, no polling for run jobs
if task.check.job:
# Pick the right scheduler
Expand Down Expand Up @@ -538,15 +557,33 @@ async def _runcase(self, case, task):
'maximum session duration exceeded'
)
await task.run_wait()
if self._pipeline_statistics:
self._update_pipeline_progress('running', 'completing', 1)
self._partition_tasks[partname].remove(task)
if not self.skip_sanity_check:
task.sanity()

if not self.skip_performance_check:
task.performance()

self._retired_tasks.append(task)
task.finalize()
self._retired_tasks.append(task)

if self._pipeline_statistics:
self._update_pipeline_progress('completing', 'retired', 1)

if self._pipeline_statistics:
num_retired = len(self._retired_tasks)
_cleanup_all(self._retired_tasks, not self.keep_stage_files)
if self._pipeline_statistics:
num_retired_actual = num_retired - len(self._retired_tasks)

# Some tests might not be cleaned up because they are
# waiting for dependencies or because their dependencies
# have failed.
self._update_pipeline_progress(
'retired', 'completed', num_retired_actual
)

except TaskExit:
self._current_tasks.remove(task)
Expand Down Expand Up @@ -737,6 +774,8 @@ def on_task_success(self, task):
def _exit(self):
# Clean up all remaining tasks
_cleanup_all(self._retired_tasks, not self.keep_stage_files)
if self._pipeline_statistics:
self._dump_pipeline_progress('pipeline-progress.json')

def execute(self, testcases):
try:
Expand Down
2 changes: 1 addition & 1 deletion reframe/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@
"systems*/sched_options/sched_access_in_submit": false,
"systems*/sched_options/ssh_hosts": [],
"systems*/sched_options/ignore_reqnodenotavail": false,
"systems*/sched_options/job_submit_timeout": 100,
"systems*/sched_options/job_submit_timeout": 60,
"systems*/sched_options/resubmit_on_errors": [],
"systems*/sched_options/unqualified_hostnames": false,
"systems*/sched_options/use_nodes_option": false
Expand Down
56 changes: 27 additions & 29 deletions reframe/utility/osext.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,36 +415,34 @@ async def run_command(cmd, check=False, timeout=None, **kwargs):
times out.
'''
try:
from reframe.core.logging import getlogger
getlogger().debug(f"START: {cmd} launching at {datetime.datetime.now()}")
## Synchronous command launch ----------------------------------------
# t_start = time.time()
# proc = run_command_process(cmd, start_new_session=True, **kwargs)
# t_start = time.time()
## Asynchronous command launch ----------------------------------------
proc = await run_command_asyncio(cmd, start_new_session=False, **kwargs)
await asyncio.sleep(0)
## Asynchronous command communication ----------------------------------------
t_start = time.time()
proc_stdout, proc_stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
## Synchronous command communication ----------------------------------------
# t_end = time.time()
# getlogger().debug(f"INFO: {cmd} launch command took: {t_end-t_start}")
# t_start = time.time()
# loop = asyncio.get_event_loop()
# try:
# proc_stdout, proc_stderr = await loop.run_in_executor(None, proc.communicate, timeout)
# except concurrent.futures._base.CancelledError:
# raise KeyboardInterrupt
t_end = time.time()
getlogger().debug(f"INFO: {cmd} command took: {t_end-t_start}")
except asyncio.TimeoutError as e:
if timeout:
cmd = f'time timeout {timeout} ' + cmd
from reframe.core.logging import getlogger
getlogger().debug(f"START: {cmd} launching at {datetime.datetime.now()}")
## Synchronous command launch ----------------------------------------
# t_start = time.time()
# proc = run_command_process(cmd, start_new_session=True, **kwargs)
# t_start = time.time()
## Asynchronous command launch ----------------------------------------
t_start = time.time()
proc = await run_command_asyncio(cmd, start_new_session=False, **kwargs)
# await asyncio.sleep(0)
## Asynchronous command communication ----------------------------------------
proc_stdout, proc_stderr = await proc.communicate()
## Synchronous command communication ----------------------------------------
# t_end = time.time()
# getlogger().debug(f"INFO: {cmd} launch command took: {t_end-t_start}")
# t_start = time.time()
# loop = asyncio.get_event_loop()
# try:
# proc_stdout, proc_stderr = await loop.run_in_executor(None, proc.communicate, timeout)
# except concurrent.futures._base.CancelledError:
# raise KeyboardInterrupt
t_end = time.time()
getlogger().debug(f"INFO: {cmd} command took: {t_end-t_start}")

if proc.returncode == 124:
from reframe.core.logging import getlogger
t_end = time.time()
getlogger().debug(f"INFO: {cmd} command took: {t_end-t_start}")
try:
# Get the process with psutil because we need to cancel the group
p = psutil.Process(proc.pid)
Expand Down

0 comments on commit f8b5e9e

Please sign in to comment.