Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send logs from workers to master and improve log viewer tab in the Web UI #2750

Merged
merged 9 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,15 @@ The list of statistics parameters that can be modified is:
+-------------------------------------------+--------------------------------------------------------------------------------------+
| PERCENTILES_TO_STATISTICS | List of response time percentiles in the screen of statistics for UI |
+-------------------------------------------+--------------------------------------------------------------------------------------+

Customization of additional static variables
============================================

This table lists the constants that are set within locust and may be overridden.

+-------------------------------------------+--------------------------------------------------------------------------------------+
| Parameter name | Purpose |
+-------------------------------------------+--------------------------------------------------------------------------------------+
| locust.runners.WORKER_LOG_REPORT_INTERVAL | Interval for how frequently worker logs are reported to master. Can be disabled |
| | by setting to a negative number |
+-------------------------------------------+--------------------------------------------------------------------------------------+
6 changes: 6 additions & 0 deletions locust/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def __init__(
"""List of the available Tasks per User Classes to pick from in the Task Picker"""
self.dispatcher_class = dispatcher_class
"""A user dispatcher class that decides how users are spawned, default :class:`UsersDispatcher <locust.dispatch.UsersDispatcher>`"""
self.worker_logs: dict[str, list[str]] = {}
"""Captured logs from all connected workers"""

self._remove_user_classes_with_weight_zero()
self._validate_user_class_name_uniqueness()
Expand Down Expand Up @@ -209,6 +211,10 @@ def update_user_class(self, user_settings):
if key == "tasks":
user_class.tasks = [task for task in user_tasks if task.__name__ in value]

def update_worker_logs(self, worker_log_report):
if worker_log_report.get("worker_id", None):
self.worker_logs[worker_log_report.get("worker_id")] = worker_log_report.get("logs", [])

def _filter_tasks_by_tags(self) -> None:
"""
Filter the tasks on all the user_classes recursively, according to the tags and
Expand Down
12 changes: 11 additions & 1 deletion locust/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging.config
import re
import socket
from collections import deque

HOSTNAME = re.sub(r"\..*", "", socket.gethostname())

Expand All @@ -13,7 +14,7 @@
class LogReader(logging.Handler):
def __init__(self):
super().__init__()
self.logs = []
self.logs = deque(maxlen=500)

def emit(self, record):
self.logs.append(self.format(record))
Expand Down Expand Up @@ -75,6 +76,15 @@ def setup_logging(loglevel, logfile=None):
logging.config.dictConfig(LOGGING_CONFIG)


def get_logs():
log_reader_handler = [handler for handler in logging.getLogger("root").handlers if handler.name == "log_reader"]

if log_reader_handler:
return list(log_reader_handler[0].logs)

return []


def greenlet_exception_logger(logger, level=logging.CRITICAL):
"""
Return a function that can be used as argument to Greenlet.link_exception() that will log the
Expand Down
29 changes: 28 additions & 1 deletion locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from . import argument_parser
from .dispatch import UsersDispatcher
from .exception import RPCError, RPCReceiveError, RPCSendError
from .log import greenlet_exception_logger
from .log import get_logs, greenlet_exception_logger
from .rpc import (
Message,
rpc,
Expand All @@ -66,6 +66,7 @@
"missing",
]
WORKER_REPORT_INTERVAL = 3.0
WORKER_LOG_REPORT_INTERVAL = 10
CPU_MONITOR_INTERVAL = 5.0
CPU_WARNING_THRESHOLD = 90
HEARTBEAT_INTERVAL = 1
Expand Down Expand Up @@ -1116,6 +1117,8 @@ def client_listener(self) -> NoReturn:
# a worker finished spawning (this happens multiple times during rampup)
self.clients[msg.node_id].state = STATE_RUNNING
self.clients[msg.node_id].user_classes_count = msg.data["user_classes_count"]
elif msg.type == "logs":
self.environment.update_worker_logs(msg.data)
elif msg.type == "quit":
if msg.node_id in self.clients:
client = self.clients[msg.node_id]
Expand Down Expand Up @@ -1212,6 +1215,7 @@ def __init__(self, environment: Environment, master_host: str, master_port: int)
self.client_id = socket.gethostname() + "_" + uuid4().hex
self.master_host = master_host
self.master_port = master_port
self.logs: list[str] = []
self.worker_cpu_warning_emitted = False
self._users_dispatcher: UsersDispatcher | None = None
self.client = rpc.Client(master_host, master_port, self.client_id)
Expand All @@ -1220,6 +1224,7 @@ def __init__(self, environment: Environment, master_host: str, master_port: int)
self.greenlet.spawn(self.heartbeat).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.heartbeat_timeout_checker).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.stats_reporter).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.logs_reporter).link_exception(greenlet_exception_handler)

# register listener that adds the current number of spawned users to the report that is sent to the master node
def on_report_to_master(client_id: str, data: dict[str, Any]):
Expand Down Expand Up @@ -1417,6 +1422,25 @@ def stats_reporter(self) -> NoReturn:
logger.error(f"Temporary connection lost to master server: {e}, will retry later.")
gevent.sleep(WORKER_REPORT_INTERVAL)

def logs_reporter(self) -> None:
if WORKER_LOG_REPORT_INTERVAL < 0:
return

while True:
current_logs = get_logs()

if (len(current_logs) - len(self.logs)) > 10:
logger.warning(
"The worker attempted to send more than 10 log lines in one interval. Further log sending was disabled for this worker."
)
self._send_logs(get_logs())
break
if len(current_logs) > len(self.logs):
self._send_logs(current_logs)

self.logs = current_logs
gevent.sleep(WORKER_LOG_REPORT_INTERVAL)

def send_message(self, msg_type: str, data: dict[str, Any] | None = None, client_id: str | None = None) -> None:
"""
Sends a message to master node
Expand All @@ -1433,6 +1457,9 @@ def _send_stats(self) -> None:
self.environment.events.report_to_master.fire(client_id=self.client_id, data=data)
self.client.send(Message("stats", data, self.client_id))

def _send_logs(self, current_logs) -> None:
self.send_message("logs", {"worker_id": self.client_id, "logs": current_logs})

def connect_to_master(self):
self.retry += 1
self.client.send(Message("client_ready", __version__, self.client_id))
Expand Down
Loading
Loading