Skip to content

Commit

Permalink
Merge pull request #1782 from nathan-beam/allow-cross-thread-communic…
Browse files Browse the repository at this point in the history
…ation

Allow cross process communication using custom messages
  • Loading branch information
cyberw authored Jun 23, 2021
2 parents 6ac8257 + a6c9b1b commit 3666efd
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 1 deletion.
3 changes: 2 additions & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ Runner classes
.. autoclass:: locust.runners.LocalRunner

.. autoclass:: locust.runners.MasterRunner
:members: register_message, send_message

.. autoclass:: locust.runners.WorkerRunner

:members: register_message, send_message

Web UI class
============
Expand Down
44 changes: 44 additions & 0 deletions docs/running-locust-distributed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,50 @@ listen to. Defaults to 5557.
Used when starting the master node with ``--headless``. The master node will then wait until X worker
nodes has connected before the test is started.

Communicating across nodes
=============================================

When running Locust in distributed mode, you may want to communicate between master and worker nodes in
order to coordinate data. This can be easily accomplished with custom messages using the built in messaging hooks:

.. code-block:: python
from locust import events
from locust.runners import MasterRunner, WorkerRunner
# Fired when the worker recieves a message of type 'test_users'
def setup_test_users(environment, msg, **kwargs):
for user in msg.data:
print(f"User {user['name']} recieved")
environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!")
# Fired when the master recieves a message of type 'acknowledge_users'
def on_acknowledge(msg, **kwargs):
print(msg.data)
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
environment.runner.register_message('test_users', setup_test_users)
if not isinstance(environment.runner, WorkerRunner):
environment.runner.register_message('acknowledge_users', on_acknowledge)
@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
users = [
{"name": "User1"},
{"name": "User2"},
{"name": "User3"},
]
environment.runner.send_message('test_users', users)
Note that when running locally (i.e. non-distributed), this functionality will be preserved;
the messages will simply be handled by the same runner that sends them.

A more complete example can be found in the `examples directory <https://github.com/locustio/locust/tree/master/examples>`_ of the Locust
source code.


Running distributed with Docker
=============================================
Expand Down
60 changes: 60 additions & 0 deletions examples/custom_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from locust import HttpUser, task, events, between
from locust.runners import MasterRunner, WorkerRunner

usernames = []


def setup_test_users(environment, msg, **kwargs):
# Fired when the worker recieves a message of type 'test_users'
usernames.extend(map(lambda u: u["name"], msg.data))
environment.runner.send_message("acknowledge_users", f"Thanks for the {len(msg.data)} users!")


def on_acknowledge(msg, **kwargs):
# Fired when the master recieves a message of type 'acknowledge_users'
print(msg.data)


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
environment.runner.register_message("test_users", setup_test_users)
if not isinstance(environment.runner, WorkerRunner):
environment.runner.register_message("acknowledge_users", on_acknowledge)


@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
# When the test is started, evenly divides list between
# worker nodes to ensure unique data across threads
if not isinstance(environment.runner, WorkerRunner):
users = []
for i in range(environment.runner.target_user_count):
users.append({"name": f"User{i}"})

worker_count = environment.runner.worker_count
chunk_size = int(len(users) / worker_count)

for i, worker in enumerate(environment.runner.clients):
start_index = i * chunk_size

if i + 1 < worker_count:
end_index = start_index + chunk_size
else:
end_index = len(users)

data = users[start_index:end_index]
environment.runner.send_message("test_users", data, worker)


class WebsiteUser(HttpUser):
host = "http://127.0.0.1:8089"
wait_time = between(2, 5)

def __init__(self, parent):
self.username = usernames.pop()
super(WebsiteUser, self).__init__(parent)

@task
def task(self):
print(self.username)
62 changes: 62 additions & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, environment):
self.greenlet.spawn(self.monitor_cpu).link_exception(greenlet_exception_handler)
self.exceptions = {}
self.target_user_count = None
self.custom_messages = {}

# set up event listeners for recording requests
def on_request_success(request_type, name, response_time, response_length, **_kwargs):
Expand Down Expand Up @@ -381,6 +382,15 @@ def log_exception(self, node_id, msg, formatted_tb):
row["nodes"].add(node_id)
self.exceptions[key] = row

def register_message(self, msg_type, listener):
"""
Register a listener for a custom message from another node
:param msg_type: The type of the message to listen for
:param listener: The function to execute when the message is received
"""
self.custom_messages[msg_type] = listener


class LocalRunner(Runner):
"""
Expand Down Expand Up @@ -425,6 +435,21 @@ def stop(self):
super().stop()
self.environment.events.test_stop.fire(environment=self.environment)

def send_message(self, msg_type, data=None):
"""
Emulates internodal messaging by calling registered listeners
:param msg_type: The type of the message to emulate sending
:param data: Optional data to include
"""
logger.debug(f"Running locally: sending {msg_type} message to self")
if msg_type in self.custom_messages:
listener = self.custom_messages[msg_type]
msg = Message(msg_type, data, "local")
listener(environment=self.environment, msg=msg)
else:
logger.warning(f"Unknown message type recieved: {msg_type}")


class DistributedRunner(Runner):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -709,13 +734,35 @@ def client_listener(self):
self.quit()
elif msg.type == "exception":
self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])
elif msg.type in self.custom_messages:
logger.debug(f"Recieved {msg.type} message from worker {msg.node_id}")
self.custom_messages[msg.type](environment=self.environment, msg=msg)
else:
logger.warning(f"Unknown message type recieved from worker {msg.node_id}: {msg.type}")

self.check_stopped()

@property
def worker_count(self):
return len(self.clients.ready) + len(self.clients.spawning) + len(self.clients.running)

def send_message(self, msg_type, data=None, client_id=None):
"""
Sends a message to attached worker node(s)
:param msg_type: The type of the message to send
:param data: Optional data to send
:param client_id: Optional id of the target worker node.
If None, will send to all attached workers
"""
if client_id:
logger.debug(f"Sending {msg_type} message to client {client_id}")
self.server.send_to_client(Message(msg_type, data, client_id))
else:
for client in self.clients.all:
logger.debug(f"Sending {msg_type} message to client {client.id}")
self.server.send_to_client(Message(msg_type, data, client.id))


class WorkerRunner(DistributedRunner):
"""
Expand Down Expand Up @@ -828,6 +875,11 @@ def worker(self):
self.stop()
self._send_stats() # send a final report, in case there were any samples not yet reported
self.greenlet.kill(block=True)
elif msg.type in self.custom_messages:
logger.debug(f"Recieved {msg.type} message from master")
self.custom_messages[msg.type](environment=self.environment, msg=msg)
else:
logger.warning(f"Unknown message type recieved: {msg.type}")

def stats_reporter(self):
while True:
Expand All @@ -837,6 +889,16 @@ def stats_reporter(self):
logger.error("Temporary connection lost to master server: %s, will retry later." % (e))
gevent.sleep(WORKER_REPORT_INTERVAL)

def send_message(self, msg_type, data=None):
"""
Sends a message to master node
:param msg_type: The type of the message to send
:param data: Optional data to send
"""
logger.debug(f"Sending {msg_type} message to master")
self.client.send(Message(msg_type, data, self.client_id))

def _send_stats(self):
data = {}
self.environment.events.report_to_master.fire(client_id=self.client_id, data=data)
Expand Down
Loading

0 comments on commit 3666efd

Please sign in to comment.