Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow cross process communication using custom messages #1782

Merged
merged 11 commits into from
Jun 23, 2021
Merged
3 changes: 2 additions & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ Runner classes
.. autoclass:: locust.runners.LocalRunner

.. autoclass:: locust.runners.MasterRunner
:members: register_message, send_message

.. autoclass:: locust.runners.WorkerRunner

:members: register_message, send_message

Web UI class
============
Expand Down
38 changes: 38 additions & 0 deletions docs/running-locust-distributed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,44 @@ listen to. Defaults to 5557.
Used when starting the master node with ``--headless``. The master node will then wait until X worker
nodes has connected before the test is started.

Communicating across nodes
=============================================

When running Locust in distributed mode, you may want to communicate between nodes in order to coordinate
nathan-beam marked this conversation as resolved.
Show resolved Hide resolved
data. This can be easily accomplished with custom messages using the built in messaging hooks:

.. code-block:: python

from locust import events
from locust.runners import MasterRunner, WorkerRunner

# Fired when the worker recieves a message of type 'test_users'
def setup_test_users(environment, msg, **kwargs):
for user in msg.data:
print(f"User {user['name']} recieved")
environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!")

# Fired when the master recieves a message of type 'acknowledge_users'
def on_acknowledge(msg, **kwargs):
print(msg.data)

@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if isinstance(environment.runner, WorkerRunner):
environment.runner.register_message('test_users', setup_test_users)
elif isinstance(environment.runner, MasterRunner):
environment.runner.register_message('acknowledge_users', on_acknowledge)

@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
if isinstance(environment.runner, MasterRunner):
users = [
{"name": "User1"},
{"name": "User2"},
{"name": "User3"},
]
environment.runner.send_message('test_users', users)

nathan-beam marked this conversation as resolved.
Show resolved Hide resolved

Running distributed with Docker
=============================================
Expand Down
37 changes: 37 additions & 0 deletions examples/custom_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from locust import HttpUser, task, events, between
nathan-beam marked this conversation as resolved.
Show resolved Hide resolved
from locust.runners import MasterRunner, WorkerRunner

# Fired when the worker recieves a message of type 'test_users'
def setup_test_users(environment, msg, **kwargs):
for user in msg.data:
print(f"User {user['name']} recieved")
environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!")

# Fired when the master recieves a message of type 'acknowledge_users'
def on_acknowledge(msg, **kwargs):
print(msg.data)

@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if isinstance(environment.runner, WorkerRunner):
environment.runner.register_message('test_users', setup_test_users)
elif isinstance(environment.runner, MasterRunner):
environment.runner.register_message('acknowledge_users', on_acknowledge)

@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
if isinstance(environment.runner, MasterRunner):
users = [
{"name": "User1"},
{"name": "User2"},
{"name": "User3"},
]
environment.runner.send_message('test_users', users)
nathan-beam marked this conversation as resolved.
Show resolved Hide resolved

class WebsiteUser(HttpUser):
host = "http://127.0.0.1:8089"
wait_time = between(2, 5)

@task
def task(self):
pass
36 changes: 36 additions & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,18 @@ def stop(self):

class DistributedRunner(Runner):
def __init__(self, *args, **kwargs):
self.custom_messages = {}
super().__init__(*args, **kwargs)
setup_distributed_stats_event_listeners(self.environment.events, self.stats)

def register_message(self, msg_type, listener):
"""
Register a listener for a custom message from another node

:param msg_type: The type of the message to listen for
:param listener: The function to execute when the message is received
"""
self.custom_messages[msg_type] = listener

class WorkerNode:
def __init__(self, id, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVENESS):
Expand Down Expand Up @@ -709,13 +718,27 @@ def client_listener(self):
self.quit()
elif msg.type == "exception":
self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])
elif msg.type in self.custom_messages:
self.custom_messages[msg.type](environment=self.environment, msg=msg)

self.check_stopped()

@property
def worker_count(self):
return len(self.clients.ready) + len(self.clients.spawning) + len(self.clients.running)

def send_message(self, msg_type, data=None):
"""
Sends a message to all attached worker nodes

:param msg_type: The type of the message to send
:param data: Optional data to send
"""
for client in self.clients.all:
logger.debug("Sending {msg_type} message to client {client_id}")
nathan-beam marked this conversation as resolved.
Show resolved Hide resolved
self.server.send_to_client(Message(msg_type, data, client.id))



class WorkerRunner(DistributedRunner):
"""
Expand Down Expand Up @@ -828,6 +851,9 @@ def worker(self):
self.stop()
self._send_stats() # send a final report, in case there were any samples not yet reported
self.greenlet.kill(block=True)
elif msg.type in self.custom_messages:
logger.debug("Recieved {msg_type} message on from master")
self.custom_messages[msg.type](environment=self.environment, msg=msg)

def stats_reporter(self):
while True:
Expand All @@ -837,6 +863,16 @@ def stats_reporter(self):
logger.error("Temporary connection lost to master server: %s, will retry later." % (e))
gevent.sleep(WORKER_REPORT_INTERVAL)

def send_message(self, msg_type, data=None):
"""
Sends a message to master node

:param msg_type: The type of the message to send
:param data: Optional data to send
"""
logger.debug("Sending {msg_type} message to master")
self.client.send(Message(msg_type, data, self.client_id))

def _send_stats(self):
data = {}
self.environment.events.report_to_master.fire(client_id=self.client_id, data=data)
Expand Down
87 changes: 87 additions & 0 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,50 @@ def test_master_reset_connection(self):
self.assertEqual(1, len(master.clients))
master.quit()

def test_custom_message_send(self):
class MyUser(User):
wait_time = constant(1)

@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
for i in range(5):
master.clients[i] = WorkerNode(i)
master.send_message("test_custom_msg", {"test_data": 123})

self.assertEqual(5, len(server.outbox))
for _, msg in server.outbox:
self.assertEqual("test_custom_msg", msg.type)
self.assertEqual(123, msg.data['test_data'])

def test_custom_message_receive(self):
class MyUser(User):
wait_time = constant(1)

@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
test_custom_msg = [False]
test_custom_msg_data = [{}]

def on_custom_msg(msg, **kw):
test_custom_msg[0] = True
test_custom_msg_data[0] = msg.data

master = self.get_runner()
master.register_message("test_custom_msg", on_custom_msg)

server.mocked_send(
Message("test_custom_msg", {'test_data': 123}, "dummy_id")
)

self.assertTrue(test_custom_msg[0])
self.assertEqual(123, test_custom_msg_data[0]['test_data'])
class TestWorkerRunner(LocustTestCase):
def setUp(self):
super().setUp()
Expand Down Expand Up @@ -1470,6 +1513,50 @@ def my_task(self):
self.assertEqual(9, len(worker.user_greenlets))
worker.quit()

def test_custom_message_send(self):
class MyUser(User):
wait_time = constant(1)

@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:
environment = Environment()
worker = self.get_runner(environment=environment, user_classes=[MyUser])
client.outbox.clear()
worker.send_message('test_custom_msg', {'test_data': 123})
self.assertEqual("test_custom_msg", client.outbox[0].type)
self.assertEqual(123, client.outbox[0].data['test_data'])
worker.quit()

def test_custom_message_receive(self):
class MyUser(User):
wait_time = constant(1)

@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:
environment = Environment()
test_custom_msg = [False]
test_custom_msg_data = [{}]

def on_custom_msg(msg, **kw):
test_custom_msg[0] = True
test_custom_msg_data[0] = msg.data

worker = self.get_runner(environment=environment, user_classes=[MyUser])
worker.register_message("test_custom_msg", on_custom_msg)

client.mocked_send(
Message("test_custom_msg", {'test_data': 123}, "dummy_client_id")
)

self.assertTrue(test_custom_msg[0])
self.assertEqual(123, test_custom_msg_data[0]['test_data'])
worker.quit()

class TestMessageSerializing(unittest.TestCase):
def test_message_serialize(self):
Expand Down