Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new events for heartbeat and usage monitor #2777

Merged
merged 4 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions examples/test_data_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
# 3. Test start
# 4. User start
# 5. Inside a task
# M1. CPU & memory usage
mgor marked this conversation as resolved.
Show resolved Hide resolved
# M2. master sent heartbeat to worker 1-N
# M3. worker 1-N received heartbeat from master
# (M* are repeated as long as locust is running)
# ...
# 6. Test run stopping
# 7. User stop
Expand All @@ -15,17 +19,21 @@
# 10. Locust quit
#
# try it out by running:
# locust -f test_data_management.py --headless -u 2 -t 5
# locust -f test_data_management.py --headless -u 2 -t 5 --processes 2
from __future__ import annotations

from locust import HttpUser, events, task
from locust.env import Environment
from locust.runners import MasterRunner
from locust.user.wait_time import constant

import datetime
from typing import Any

import requests


def timestring():
def timestring() -> str:
now = datetime.datetime.now()
return datetime.datetime.strftime(now, "%m:%S.%f")[:-5]

Expand All @@ -42,17 +50,17 @@ def timestring():


@events.init.add_listener
def init(environment, **_kwargs):
def init(environment: Environment, **_kwargs: Any) -> None:
print("2. Initializing locust, happens after parsing the locustfile but before test start")


@events.quitting.add_listener
def quitting(environment, **_kwargs):
def quitting(environment: Environment, **_kwargs: Any) -> None:
print("9. locust is about to shut down")


@events.test_start.add_listener
def test_start(environment, **_kwargs):
def test_start(environment: Environment, **_kwargs) -> None:
# happens only once in headless runs, but can happen multiple times in web ui-runs
global test_run_specific_data
print("3. Starting test run")
Expand All @@ -64,18 +72,35 @@ def test_start(environment, **_kwargs):
).json()["data"]


@events.heartbeat_sent.add_listener
def heartbeat_sent(client_id: str, timestamp: float) -> None:
print(f"M2. master sent heartbeat to worker {client_id} at {datetime.datetime.fromtimestamp(timestamp)}")


@events.heartbeat_received.add_listener
def heartbeat_received(client_id: str, timestamp: float) -> None:
print(f"M3. worker {client_id} received heartbeat from master at {datetime.datetime.fromtimestamp(timestamp)}")


@events.usage_monitor.add_listener
def usage_monitor(environment: Environment, cpu_usage: float, memory_usage: int) -> None:
# convert from bytes to Mebibytes
memory_usage = memory_usage / 1024 / 1024
print(f"M1. {environment.runner.__class__.__name__}: cpu={cpu_usage}%, memory={memory_usage}M")


@events.quit.add_listener
def quit(exit_code, **kwargs):
def quit(exit_code: int, **kwargs: Any) -> None:
print(f"10. Locust has shut down with code {exit_code}")


@events.test_stopping.add_listener
def test_stopping(environment, **_kwargs):
def test_stopping(environment: Environment, **_kwargs: Any) -> None:
print("6. stopping test run")


@events.test_stop.add_listener
def test_stop(environment, **_kwargs):
def test_stop(environment: Environment, **_kwargs: Any) -> None:
print("8. test run stopped")


Expand All @@ -84,7 +109,7 @@ class MyUser(HttpUser):
wait_time = constant(180) # be nice to postman-echo
first_start = True

def on_start(self):
def on_start(self) -> None:
if MyUser.first_start:
MyUser.first_start = False
# This is useful for similar things as to test_start, but happens in the context of a User
Expand All @@ -101,7 +126,7 @@ def on_start(self):
).json()["data"]

@task
def t(self):
def t(self) -> None:
self.client.get(f"/get?{global_test_data}")
self.client.get(f"/get?{test_run_specific_data}")
self.client.get(f"/get?{self.user_specific_testdata}")
Expand All @@ -114,6 +139,6 @@ def t(self):
).json()["data"]
self.client.get(f"/get?{task_run_specific_testdata}")

def on_stop(self):
def on_stop(self) -> None:
# this is a good place to clean up/release any user-specific test data
print("7. a user was stopped")
32 changes: 32 additions & 0 deletions locust/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,38 @@ class Events:
Fired when the CPU usage exceeds runners.CPU_WARNING_THRESHOLD (90% by default)
"""

heartbeat_sent: EventHook
"""
Fired when a heartbeat is sent by master to a worker.

Event arguments:

:param client_id: worker client id
:param timestamp: time in seconds since the epoch (float) when the event occured
"""

heartbeat_received: EventHook
"""
Fired when a heartbeat is received by a worker from master.

Event arguments:

:param client_id: worker client id
:param timestamp: time in seconds since the epoch (float) when the event occured
"""

usage_monitor: EventHook
"""
Fired every runners.CPU_MONITOR_INTERVAL (5.0 seconds by default) with information about
current CPU and memory usage.

Event arguments:

:param environment: locust environment
:param cpu_usage: current CPU usage in percent
:param memory_usage: current memory usage (RSS) in bytes
"""

def __init__(self):
# For backward compatibility use also values of class attributes
for name, value in vars(type(self)).items():
Expand Down
35 changes: 13 additions & 22 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,9 @@
from abc import abstractmethod
from collections import defaultdict
from collections.abc import Iterator, MutableMapping, ValuesView
from operator import (
itemgetter,
methodcaller,
)
from operator import itemgetter, methodcaller
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Callable,
NoReturn,
TypedDict,
cast,
)
from typing import TYPE_CHECKING, Any, Callable, NoReturn, TypedDict, cast
from uuid import uuid4

import gevent
Expand All @@ -40,15 +30,8 @@
from .dispatch import UsersDispatcher
from .exception import RPCError, RPCReceiveError, RPCSendError
from .log import get_logs, greenlet_exception_logger
from .rpc import (
Message,
rpc,
)
from .stats import (
RequestStats,
StatsError,
setup_distributed_stats_event_listeners,
)
from .rpc import Message, rpc
from .stats import RequestStats, StatsError, setup_distributed_stats_event_listeners

if TYPE_CHECKING:
from . import User
Expand Down Expand Up @@ -106,7 +89,7 @@ def __init__(self, environment: Environment) -> None:
self.spawning_greenlet: gevent.Greenlet | None = None
self.shape_greenlet: gevent.Greenlet | None = None
self.shape_last_tick: tuple[int, float] | tuple[int, float, list[type[User]] | None] | None = None
self.current_cpu_usage: int = 0
self.current_cpu_usage: float = 0.0
self.cpu_warning_emitted: bool = False
self.worker_cpu_warning_emitted: bool = False
self.current_memory_usage: int = 0
Expand Down Expand Up @@ -308,6 +291,10 @@ def monitor_cpu_and_memory(self) -> NoReturn:
f"CPU usage above {CPU_WARNING_THRESHOLD}%! This may constrain your throughput and may even give inconsistent response time measurements! See https://docs.locust.io/en/stable/running-distributed.html for how to distribute the load over multiple CPU cores or machines"
)
self.cpu_warning_emitted = True

self.environment.events.usage_monitor.fire(
environment=self.environment, cpu_usage=self.current_cpu_usage, memory_usage=self.current_memory_usage
)
gevent.sleep(CPU_MONITOR_INTERVAL)

@abstractmethod
Expand Down Expand Up @@ -1102,6 +1089,7 @@ def client_listener(self) -> NoReturn:
)
if "current_memory_usage" in msg.data:
c.memory_usage = msg.data["current_memory_usage"]
self.environment.events.heartbeat_sent.fire(client_id=msg.node_id, timestamp=time.time())
self.server.send_to_client(Message("heartbeat", None, msg.node_id))
else:
logging.debug(f"Got heartbeat message from unknown worker {msg.node_id}")
Expand Down Expand Up @@ -1399,6 +1387,9 @@ def worker(self) -> NoReturn:
self.reset_connection()
elif msg.type == "heartbeat":
self.last_heartbeat_timestamp = time.time()
self.environment.events.heartbeat_received.fire(
client_id=msg.node_id, timestamp=self.last_heartbeat_timestamp
)
elif msg.type == "update_user_class":
self.environment.update_user_class(msg.data)
elif msg.type == "spawning_complete":
Expand Down
Loading
Loading