Skip to content

Commit

Permalink
TaskStatusService connection lost fix
Browse files Browse the repository at this point in the history
This commit adds error handling to TaskStatusService. When the
connection is lost, it will try to establish a new connection. If the
connection is not possible to renew, the task will send warning message
about new status and remove TaskStatusService from available services.

Reference: #5794
Signed-off-by: Jan Richter <[email protected]>
Signed-off-by: Cleber Rosa <[email protected]>
  • Loading branch information
richtja committed Jan 31, 2024
1 parent fa4d278 commit 2b23b11
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
26 changes: 23 additions & 3 deletions avocado/core/nrunner/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,16 @@ def _create_connection(self):

def post(self, status):
data = json_dumps(status)
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
try:
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
except BrokenPipeError:
try:
self._create_connection()
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
except ConnectionRefusedError:
LOG.warning(f"Connection with {self.uri} has been lost.")
return False
return True

def close(self):
if self.connection is not None:
Expand Down Expand Up @@ -211,12 +220,23 @@ def run(self):
self.setup_output_dir()
runner_klass = self.runnable.pick_runner_class()
runner = runner_klass()
running_status_services = self.status_services
damaged_status_services = []
for status in runner.run(self.runnable):
if status["status"] == "started":
status.update({"output_dir": self.runnable.output_dir})
status.update({"id": self.identifier})
if self.job_id is not None:
status.update({"job_id": self.job_id})
for status_service in self.status_services:
status_service.post(status)
for status_service in running_status_services:
if not status_service.post(status):
damaged_status_services.append(status_service)
if damaged_status_services:
running_status_services = list(
filter(
lambda s: s not in damaged_status_services,
running_status_services,
)
)
damaged_status_services.clear()
yield status
2 changes: 1 addition & 1 deletion selftests/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"nrunner-requirement": 16,
"unit": 667,
"jobs": 11,
"functional-parallel": 299,
"functional-parallel": 300,
"functional-serial": 4,
"optional-plugins": 0,
"optional-plugins-golang": 2,
Expand Down
20 changes: 20 additions & 0 deletions selftests/functional/nrunner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import time
import unittest

from avocado.core.job import Job
Expand Down Expand Up @@ -265,6 +266,25 @@ def test_recipe_exec_test_3(self):
self.assertEqual(res.exit_status, 0)


class TaskRunStatusService(TestCaseTmpDir):
@skipUnlessPathExists("/bin/sleep")
@skipUnlessPathExists("/bin/nc")
def test_task_status_service_lost(self):
nc_path = os.path.join(self.tmpdir.name, "socket")
nc_proc = process.SubProcess(f"nc -lU {nc_path}")
nc_proc.start()
task_proc = process.SubProcess(
f"avocado-runner-exec-test task-run -i 1 -u /bin/sleep -a 3 -s {nc_path}"
)
task_proc.start()
time.sleep(1)
nc_proc.kill()
time.sleep(1)
self.assertIn(
f"Connection with {nc_path} has been lost.".encode(), task_proc.get_stderr()
)


class ResolveSerializeRun(TestCaseTmpDir):
@skipUnlessPathExists("/bin/true")
def test(self):
Expand Down

0 comments on commit 2b23b11

Please sign in to comment.