Skip to content

Commit

Permalink
TaskStatusService connection lost fix
Browse files Browse the repository at this point in the history
This commit adds error handling to TaskStatusService. When the
connection is lost, it will try to establish a new connection. If the
connection is not possible to renew, the task will send warning message
about new status and remove TaskStatusService from available services.

Reference: avocado-framework#5794
Signed-off-by: Jan Richter <[email protected]>
  • Loading branch information
richtja committed Jan 30, 2024
1 parent fa4d278 commit 268f41c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
25 changes: 22 additions & 3 deletions avocado/core/nrunner/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,16 @@ def _create_connection(self):

def post(self, status):
data = json_dumps(status)
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
try:
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
except BrokenPipeError:
try:
self._create_connection()
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
except ConnectionRefusedError:
LOG.warning(f"Connection with {self.uri} has been lost.")
return False
return True

def close(self):
if self.connection is not None:
Expand Down Expand Up @@ -211,12 +220,22 @@ def run(self):
self.setup_output_dir()
runner_klass = self.runnable.pick_runner_class()
runner = runner_klass()
running_status_services = self.status_services
dameged_status_services = []
for status in runner.run(self.runnable):
if status["status"] == "started":
status.update({"output_dir": self.runnable.output_dir})
status.update({"id": self.identifier})
if self.job_id is not None:
status.update({"job_id": self.job_id})
for status_service in self.status_services:
status_service.post(status)
for status_service in running_status_services:
if not status_service.post(status):
dameged_status_services.append(status_service)
if dameged_status_services:
running_status_services = list(
filter(
lambda s: s not in dameged_status_services,
running_status_services,
)
)
yield status
2 changes: 1 addition & 1 deletion selftests/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"nrunner-requirement": 16,
"unit": 667,
"jobs": 11,
"functional-parallel": 299,
"functional-parallel": 300,
"functional-serial": 4,
"optional-plugins": 0,
"optional-plugins-golang": 2,
Expand Down
21 changes: 20 additions & 1 deletion selftests/functional/nrunner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import sys
import unittest
Expand Down Expand Up @@ -178,7 +179,7 @@ def test_error_existing_stdout_stderr(self):
self.assertEqual(res.exit_status, 0)


class TaskRun(unittest.TestCase):
class TaskRun(TestCaseTmpDir):
def test_noop(self):
res = process.run(
f"{RUNNER} task-run -i XXXno-opXXX -k noop", ignore_status=True
Expand Down Expand Up @@ -265,6 +266,24 @@ def test_recipe_exec_test_3(self):
self.assertEqual(res.exit_status, 0)


class TaskRunStatusService(TestCaseTmpDir, unittest.IsolatedAsyncioTestCase):
@skipUnlessPathExists("/bin/sleep")
@skipUnlessPathExists("/bin/nc")
async def test_task_status_service_lost(self):
nc_path = os.path.join(self.tmpdir.name, "socket")
nc_proc = await asyncio.create_subprocess_shell(f"nc -lU {nc_path}")
await asyncio.sleep(1)
task_proc = await asyncio.create_subprocess_shell(
f"avocado-runner-exec-test task-run -i 1 -u /bin/sleep -a 3 -s {nc_path}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await asyncio.sleep(1)
nc_proc.kill()
_, stderr = await task_proc.communicate()
self.assertIn(f"Connection with {nc_path} has been lost.".encode(), stderr)


class ResolveSerializeRun(TestCaseTmpDir):
@skipUnlessPathExists("/bin/true")
def test(self):
Expand Down

0 comments on commit 268f41c

Please sign in to comment.