Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskStatusService connection lost fix #5860

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 46 additions & 18 deletions avocado/core/nrunner/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,46 @@ class TaskStatusService:

def __init__(self, uri):
self.uri = uri
self.connection = None
self._connection = None

def post(self, status):
@property
def connection(self):
if not self._connection:
self._create_connection()
return self._connection

def _create_connection(self):
"""
Creates connection with `self.uri` based on `socket.create_connection`
"""
if ":" in self.uri:
host, port = self.uri.split(":")
port = int(port)
if self.connection is None:
for _ in range(600):
try:
self.connection = socket.create_connection((host, port))
break
except ConnectionRefusedError as error:
LOG.warning(error)
time.sleep(1)
else:
self.connection = socket.create_connection((host, port))
for _ in range(600):
try:
self._connection = socket.create_connection((host, port))
break
except ConnectionRefusedError as error:
LOG.warning(error)
time.sleep(1)
else:
self._connection = socket.create_connection((host, port))
else:
if self.connection is None:
self.connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.connection.connect(self.uri)
self._connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._connection.connect(self.uri)

def post(self, status):
data = json_dumps(status)
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
try:
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
except BrokenPipeError:
try:
self._create_connection()
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
except ConnectionRefusedError:
LOG.warning(f"Connection with {self.uri} has been lost.")
return False
return True

def close(self):
if self.connection is not None:
Expand Down Expand Up @@ -203,12 +220,23 @@ def run(self):
self.setup_output_dir()
runner_klass = self.runnable.pick_runner_class()
runner = runner_klass()
running_status_services = self.status_services
damaged_status_services = []
for status in runner.run(self.runnable):
if status["status"] == "started":
status.update({"output_dir": self.runnable.output_dir})
status.update({"id": self.identifier})
if self.job_id is not None:
status.update({"job_id": self.job_id})
for status_service in self.status_services:
status_service.post(status)
for status_service in running_status_services:
if not status_service.post(status):
damaged_status_services.append(status_service)
if damaged_status_services:
running_status_services = list(
filter(
lambda s: s not in damaged_status_services,
running_status_services,
)
)
damaged_status_services.clear()
yield status
2 changes: 1 addition & 1 deletion selftests/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"nrunner-requirement": 16,
"unit": 667,
"jobs": 11,
"functional-parallel": 299,
"functional-parallel": 300,
"functional-serial": 4,
"optional-plugins": 0,
"optional-plugins-golang": 2,
Expand Down
20 changes: 20 additions & 0 deletions selftests/functional/nrunner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import time
import unittest

from avocado.core.job import Job
Expand Down Expand Up @@ -265,6 +266,25 @@ def test_recipe_exec_test_3(self):
self.assertEqual(res.exit_status, 0)


class TaskRunStatusService(TestCaseTmpDir):
@skipUnlessPathExists("/bin/sleep")
@skipUnlessPathExists("/bin/nc")
def test_task_status_service_lost(self):
nc_path = os.path.join(self.tmpdir.name, "socket")
nc_proc = process.SubProcess(f"nc -lU {nc_path}")
nc_proc.start()
task_proc = process.SubProcess(
f"avocado-runner-exec-test task-run -i 1 -u /bin/sleep -a 3 -s {nc_path}"
)
task_proc.start()
time.sleep(1)
nc_proc.kill()
time.sleep(1)
self.assertIn(
f"Connection with {nc_path} has been lost.".encode(), task_proc.get_stderr()
)


class ResolveSerializeRun(TestCaseTmpDir):
@skipUnlessPathExists("/bin/true")
def test(self):
Expand Down
Loading