diff --git a/examples/test_data_management.py b/examples/test_data_management.py index f1995562fb..5df36d0f55 100644 --- a/examples/test_data_management.py +++ b/examples/test_data_management.py @@ -6,6 +6,10 @@ # 3. Test start # 4. User start # 5. Inside a task +# M1. CPU & memory usage +# M2. master sent heartbeat to worker 1-N +# M3. worker 1-N received heartbeat from master +# (M* are repeated as long as locust is running) # ... # 6. Test run stopping # 7. User stop @@ -15,17 +19,21 @@ # 10. Locust quit # # try it out by running: -# locust -f test_data_management.py --headless -u 2 -t 5 +# locust -f test_data_management.py --headless -u 2 -t 5 --processes 2 +from __future__ import annotations + from locust import HttpUser, events, task +from locust.env import Environment from locust.runners import MasterRunner from locust.user.wait_time import constant import datetime +from typing import Any import requests -def timestring(): +def timestring() -> str: now = datetime.datetime.now() return datetime.datetime.strftime(now, "%m:%S.%f")[:-5] @@ -42,17 +50,17 @@ def timestring(): @events.init.add_listener -def init(environment, **_kwargs): +def init(environment: Environment, **_kwargs: Any) -> None: print("2. Initializing locust, happens after parsing the locustfile but before test start") @events.quitting.add_listener -def quitting(environment, **_kwargs): +def quitting(environment: Environment, **_kwargs: Any) -> None: print("9. locust is about to shut down") @events.test_start.add_listener -def test_start(environment, **_kwargs): +def test_start(environment: Environment, **_kwargs) -> None: # happens only once in headless runs, but can happen multiple times in web ui-runs global test_run_specific_data print("3. Starting test run") @@ -64,18 +72,35 @@ def test_start(environment, **_kwargs): ).json()["data"] +@events.heartbeat_sent.add_listener +def heartbeat_sent(client_id: str, timestamp: float) -> None: + print(f"M2. master sent heartbeat to worker {client_id} at {datetime.datetime.fromtimestamp(timestamp)}") + + +@events.heartbeat_received.add_listener +def heartbeat_received(client_id: str, timestamp: float) -> None: + print(f"M3. worker {client_id} received heartbeat from master at {datetime.datetime.fromtimestamp(timestamp)}") + + +@events.usage_monitor.add_listener +def usage_monitor(environment: Environment, cpu_usage: float, memory_usage: int) -> None: + # convert from bytes to Mebibytes + memory_usage = memory_usage / 1024 / 1024 + print(f"M1. {environment.runner.__class__.__name__}: cpu={cpu_usage}%, memory={memory_usage}M") + + @events.quit.add_listener -def quit(exit_code, **kwargs): +def quit(exit_code: int, **kwargs: Any) -> None: print(f"10. Locust has shut down with code {exit_code}") @events.test_stopping.add_listener -def test_stopping(environment, **_kwargs): +def test_stopping(environment: Environment, **_kwargs: Any) -> None: print("6. stopping test run") @events.test_stop.add_listener -def test_stop(environment, **_kwargs): +def test_stop(environment: Environment, **_kwargs: Any) -> None: print("8. test run stopped") @@ -84,7 +109,7 @@ class MyUser(HttpUser): wait_time = constant(180) # be nice to postman-echo first_start = True - def on_start(self): + def on_start(self) -> None: if MyUser.first_start: MyUser.first_start = False # This is useful for similar things as to test_start, but happens in the context of a User @@ -101,7 +126,7 @@ def on_start(self): ).json()["data"] @task - def t(self): + def t(self) -> None: self.client.get(f"/get?{global_test_data}") self.client.get(f"/get?{test_run_specific_data}") self.client.get(f"/get?{self.user_specific_testdata}") @@ -114,6 +139,6 @@ def t(self): ).json()["data"] self.client.get(f"/get?{task_run_specific_testdata}") - def on_stop(self): + def on_stop(self) -> None: # this is a good place to clean up/release any user-specific test data print("7. a user was stopped") diff --git a/locust/event.py b/locust/event.py index af281ca2b9..86ec8b482f 100644 --- a/locust/event.py +++ b/locust/event.py @@ -235,6 +235,38 @@ class Events: Fired when the CPU usage exceeds runners.CPU_WARNING_THRESHOLD (90% by default) """ + heartbeat_sent: EventHook + """ + Fired when a heartbeat is sent by master to a worker. + + Event arguments: + + :param client_id: worker client id + :param timestamp: time in seconds since the epoch (float) when the event occured + """ + + heartbeat_received: EventHook + """ + Fired when a heartbeat is received by a worker from master. + + Event arguments: + + :param client_id: worker client id + :param timestamp: time in seconds since the epoch (float) when the event occured + """ + + usage_monitor: EventHook + """ + Fired every runners.CPU_MONITOR_INTERVAL (5.0 seconds by default) with information about + current CPU and memory usage. + + Event arguments: + + :param environment: locust environment + :param cpu_usage: current CPU usage in percent + :param memory_usage: current memory usage (RSS) in bytes + """ + def __init__(self): # For backward compatibility use also values of class attributes for name, value in vars(type(self)).items(): diff --git a/locust/runners.py b/locust/runners.py index 4d85da3f64..31d741217e 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -15,19 +15,9 @@ from abc import abstractmethod from collections import defaultdict from collections.abc import Iterator, MutableMapping, ValuesView -from operator import ( - itemgetter, - methodcaller, -) +from operator import itemgetter, methodcaller from types import TracebackType -from typing import ( - TYPE_CHECKING, - Any, - Callable, - NoReturn, - TypedDict, - cast, -) +from typing import TYPE_CHECKING, Any, Callable, NoReturn, TypedDict, cast from uuid import uuid4 import gevent @@ -40,15 +30,8 @@ from .dispatch import UsersDispatcher from .exception import RPCError, RPCReceiveError, RPCSendError from .log import get_logs, greenlet_exception_logger -from .rpc import ( - Message, - rpc, -) -from .stats import ( - RequestStats, - StatsError, - setup_distributed_stats_event_listeners, -) +from .rpc import Message, rpc +from .stats import RequestStats, StatsError, setup_distributed_stats_event_listeners if TYPE_CHECKING: from . import User @@ -106,7 +89,7 @@ def __init__(self, environment: Environment) -> None: self.spawning_greenlet: gevent.Greenlet | None = None self.shape_greenlet: gevent.Greenlet | None = None self.shape_last_tick: tuple[int, float] | tuple[int, float, list[type[User]] | None] | None = None - self.current_cpu_usage: int = 0 + self.current_cpu_usage: float = 0.0 self.cpu_warning_emitted: bool = False self.worker_cpu_warning_emitted: bool = False self.current_memory_usage: int = 0 @@ -308,6 +291,10 @@ def monitor_cpu_and_memory(self) -> NoReturn: f"CPU usage above {CPU_WARNING_THRESHOLD}%! This may constrain your throughput and may even give inconsistent response time measurements! See https://docs.locust.io/en/stable/running-distributed.html for how to distribute the load over multiple CPU cores or machines" ) self.cpu_warning_emitted = True + + self.environment.events.usage_monitor.fire( + environment=self.environment, cpu_usage=self.current_cpu_usage, memory_usage=self.current_memory_usage + ) gevent.sleep(CPU_MONITOR_INTERVAL) @abstractmethod @@ -1102,6 +1089,7 @@ def client_listener(self) -> NoReturn: ) if "current_memory_usage" in msg.data: c.memory_usage = msg.data["current_memory_usage"] + self.environment.events.heartbeat_sent.fire(client_id=msg.node_id, timestamp=time.time()) self.server.send_to_client(Message("heartbeat", None, msg.node_id)) else: logging.debug(f"Got heartbeat message from unknown worker {msg.node_id}") @@ -1399,6 +1387,9 @@ def worker(self) -> NoReturn: self.reset_connection() elif msg.type == "heartbeat": self.last_heartbeat_timestamp = time.time() + self.environment.events.heartbeat_received.fire( + client_id=msg.node_id, timestamp=self.last_heartbeat_timestamp + ) elif msg.type == "update_user_class": self.environment.update_user_class(msg.data) elif msg.type == "spawning_complete": diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index b101c8d89a..62afe0abef 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -1,12 +1,7 @@ from __future__ import annotations import locust -from locust import ( - LoadTestShape, - __version__, - constant, - runners, -) +from locust import LoadTestShape, __version__, constant, runners from locust.argument_parser import parse_options from locust.dispatch import UsersDispatcher from locust.env import Environment @@ -26,11 +21,7 @@ WorkerRunner, ) from locust.stats import RequestStats -from locust.user import ( - TaskSet, - User, - task, -) +from locust.user import TaskSet, User, task import json import logging @@ -2136,6 +2127,149 @@ def on_test_start(*_, **__): self.assertEqual(test_start_exec_count, 1) + def test_heartbeat_event(self) -> None: + """ + Tests that heartbeat event is fired during a test + """ + + class TestUser(User): + wait_time = constant(0.1) + + @task + def noop(self) -> None: + pass + + with mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=1): + # start a Master runner + master_env = Environment(user_classes=[TestUser]) + worker_connect_events = [] + timestamp_start: list[float] = [time.time() + 3600.0] + + def on_connect(client_id: str) -> None: + worker_connect_events.append(client_id) + timestamp_start[0] = time.time() + + master_env.events.worker_connect.add_listener(on_connect) + master = master_env.create_master_runner("*", 0) + sleep(0) + worker_env = Environment(user_classes=[TestUser]) + worker: WorkerRunner = worker_env.create_worker_runner("127.0.0.1", master.server.port) + + with ( + mock.patch.object( + worker.environment.events.heartbeat_received, + "fire", + wraps=worker.environment.events.heartbeat_received.fire, + ) as worker_heartbeat_received_mock, + mock.patch.object( + master.environment.events.heartbeat_sent, + "fire", + wraps=master.environment.events.heartbeat_sent.fire, + ) as master_heartbeat_sent_mock, + ): + # give workers time to connect + sleep(0.1) + # issue start command that should trigger TestUsers to be spawned in the Workers + master.start(2, spawn_rate=2) + sleep(0.1) + # check that worker nodes have started locusts + self.assertEqual(2, worker.user_count) + + # give time for nodes to send and receive 5 heartbeats, HEARTBEAT_INTERVAL mocked to 1 second, so + # sleep 5 seconds - 1 second that represents the overhead from connecting + sleep(5 - 1) + master.quit() + + # make sure users are killed + self.assertEqual(0, worker.user_count) + # make sure events happened correctly + self.assertIn(worker.client_id, worker_connect_events) + + timestamp_stop = time.time() + + self.assertEqual(worker_heartbeat_received_mock.call_count, 5) + self.assertEqual(master_heartbeat_sent_mock.call_count, 5) + + for call_args, call_kwargs in [ + *worker_heartbeat_received_mock.call_args_list, + *master_heartbeat_sent_mock.call_args_list, + ]: + self.assertEqual(call_args, ()) # args + self.assertEqual(call_kwargs, {"client_id": worker.client_id, "timestamp": mock.ANY}) # kwargs + self.assertGreaterEqual(call_kwargs["timestamp"], timestamp_start[0]) + self.assertLessEqual(call_kwargs["timestamp"], timestamp_stop) + + def test_usage_monitor_event(self) -> None: + """ + Tests that usage_monitor event is fired during a test + """ + + class TestUser(User): + wait_time = constant(0.1) + + @task + def noop(self) -> None: + pass + + with mock.patch("locust.runners.CPU_MONITOR_INTERVAL", new=1): + # start a Master runner + master_env = Environment(user_classes=[TestUser]) + worker_connect_events = [] + + def on_connect(client_id: str) -> None: + worker_connect_events.append(client_id) + + master_env.events.worker_connect.add_listener(on_connect) + master = master_env.create_master_runner("*", 0) + sleep(0) + worker_env = Environment(user_classes=[TestUser]) + worker: WorkerRunner = worker_env.create_worker_runner("127.0.0.1", master.server.port) + + with ( + mock.patch.object( + worker.environment.events.usage_monitor, "fire", wraps=worker.environment.events.usage_monitor.fire + ) as worker_usage_monitor_mock, + mock.patch.object( + master.environment.events.usage_monitor, "fire", wraps=master.environment.events.usage_monitor.fire + ) as master_usage_monitor_mock, + ): + # give workers time to connect + sleep(0.1) + # issue start command that should trigger TestUsers to be spawned in the Workers + master.start(2, spawn_rate=2) + sleep(0.1) + # check that worker nodes have started locusts + self.assertEqual(2, worker.user_count) + + # give time for nodes to send 5 usage_monitor events, CPU_MONITOR_INTERVAL mocked to 1 second, so + # sleep 5 seconds + sleep(5) + master.quit() + + # make sure users are killed + self.assertEqual(0, worker.user_count) + # make sure events happened correctly + self.assertIn(worker.client_id, worker_connect_events) + + self.assertEqual(worker_usage_monitor_mock.call_count, 5) + self.assertEqual(master_usage_monitor_mock.call_count, 5) + + for call_args, call_kwargs in master_usage_monitor_mock: + self.assertEqual(call_args, ()) # args + self.assertEqual( + call_kwargs, {"environment": master_env, "cpu_usage": mock.ANY, "memory_usage": mock.ANY} + ) # kwargs + self.assertTrue(isinstance(call_kwargs["cpu_usage"], float)) + self.assertTrue(isinstance(call_kwargs["memory_usage"], int)) + + for call_args, call_kwargs in worker_usage_monitor_mock: + self.assertEqual(call_args, ()) # args + self.assertEqual( + call_kwargs, {"environment": worker_env, "cpu_usage": mock.ANY, "memory_usage": mock.ANY} + ) # kwargs + self.assertTrue(isinstance(call_kwargs["cpu_usage"], float)) + self.assertTrue(isinstance(call_kwargs["memory_usage"], int)) + class TestMasterRunner(LocustRunnerTestCase): def setUp(self):