From ddcd577daa6e0400976e71a0fec293a1c947be4f Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Mon, 7 Jun 2021 12:33:21 -0400 Subject: [PATCH 01/11] Added tests --- locust/test/test_runners.py | 87 +++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index d5b6ae3dba..c9787abd9f 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -1321,7 +1321,50 @@ def test_master_reset_connection(self): self.assertEqual(1, len(master.clients)) master.quit() + def test_custom_message_send(self): + class MyUser(User): + wait_time = constant(1) + + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + master = self.get_runner() + for i in range(5): + master.clients[i] = WorkerNode(i) + master.send_message("test_custom_msg", {"test_data": 123}) + + self.assertEqual(5, len(server.outbox)) + for _, msg in server.outbox: + self.assertEqual("test_custom_msg", msg.type) + self.assertEqual(123, msg.data['test_data']) + + def test_custom_message_receive(self): + class MyUser(User): + wait_time = constant(1) + + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + test_custom_msg = [False] + test_custom_msg_data = [{}] + + def on_custom_msg(msg, **kw): + test_custom_msg[0] = True + test_custom_msg_data[0] = msg.data + + master = self.get_runner() + master.add_message("test_custom_msg", on_custom_msg) + server.mocked_send( + Message("test_custom_msg", {'test_data': 123}, "dummy_id") + ) + + self.assertTrue(test_custom_msg[0]) + self.assertEqual(123, test_custom_msg_data[0]['test_data']) class TestWorkerRunner(LocustTestCase): def setUp(self): super().setUp() @@ -1470,6 +1513,50 @@ def my_task(self): self.assertEqual(9, len(worker.user_greenlets)) worker.quit() + def test_custom_message_send(self): + class MyUser(User): + wait_time = constant(1) + + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client: + environment = Environment() + worker = self.get_runner(environment=environment, user_classes=[MyUser]) + client.outbox.clear() + worker.send_message('test_custom_msg', {'test_data': 123}) + self.assertEqual("test_custom_msg", client.outbox[0].type) + self.assertEqual(123, client.outbox[0].data['test_data']) + worker.quit() + + def test_custom_message_receive(self): + class MyUser(User): + wait_time = constant(1) + + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client: + environment = Environment() + test_custom_msg = [False] + test_custom_msg_data = [{}] + + def on_custom_msg(msg, **kw): + test_custom_msg[0] = True + test_custom_msg_data[0] = msg.data + + worker = self.get_runner(environment=environment, user_classes=[MyUser]) + worker.add_message("test_custom_msg", on_custom_msg) + + client.mocked_send( + Message("test_custom_msg", {'test_data': 123}, "dummy_client_id") + ) + + self.assertTrue(test_custom_msg[0]) + self.assertEqual(123, test_custom_msg_data[0]['test_data']) + worker.quit() class TestMessageSerializing(unittest.TestCase): def test_message_serialize(self): From d509922014b618f70e343b0249409465c5f14a7e Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Mon, 7 Jun 2021 12:33:34 -0400 Subject: [PATCH 02/11] Added implementation --- locust/runners.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/locust/runners.py b/locust/runners.py index 0b07e4be61..623cc2c055 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -428,9 +428,12 @@ def stop(self): class DistributedRunner(Runner): def __init__(self, *args, **kwargs): + self.custom_messages = {} super().__init__(*args, **kwargs) setup_distributed_stats_event_listeners(self.environment.events, self.stats) + def add_message(self, msg_type, listener): + self.custom_messages[msg_type] = listener class WorkerNode: def __init__(self, id, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVENESS): @@ -709,6 +712,8 @@ def client_listener(self): self.quit() elif msg.type == "exception": self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"]) + elif msg.type in self.custom_messages: + self.custom_messages[msg.type](msg) self.check_stopped() @@ -716,6 +721,12 @@ def client_listener(self): def worker_count(self): return len(self.clients.ready) + len(self.clients.spawning) + len(self.clients.running) + def send_message(self, msg_type, data=None): + for client in self.clients.all: + logger.debug("Sending {msg_type} message to client {client_id}") + self.server.send_to_client(Message(msg_type, data, client.id)) + + class WorkerRunner(DistributedRunner): """ @@ -828,6 +839,9 @@ def worker(self): self.stop() self._send_stats() # send a final report, in case there were any samples not yet reported self.greenlet.kill(block=True) + elif msg.type in self.custom_messages: + logger.debug("Recieved {msg_type} message on from master") + self.custom_messages[msg.type](msg) def stats_reporter(self): while True: @@ -837,6 +851,10 @@ def stats_reporter(self): logger.error("Temporary connection lost to master server: %s, will retry later." % (e)) gevent.sleep(WORKER_REPORT_INTERVAL) + def send_message(self, msg_type, data=None): + logger.debug("Sending {msg_type} message to master") + self.client.send(Message(msg_type, data, self.client_id)) + def _send_stats(self): data = {} self.environment.events.report_to_master.fire(client_id=self.client_id, data=data) From 8b06cc002881d39d41127c37cbe7d13fb2f5c3a9 Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Mon, 7 Jun 2021 13:07:37 -0400 Subject: [PATCH 03/11] Change method name and added environment param --- locust/runners.py | 6 +++--- locust/test/test_runners.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index 623cc2c055..9eab773bd4 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -432,7 +432,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) setup_distributed_stats_event_listeners(self.environment.events, self.stats) - def add_message(self, msg_type, listener): + def register_message(self, msg_type, listener): self.custom_messages[msg_type] = listener class WorkerNode: @@ -713,7 +713,7 @@ def client_listener(self): elif msg.type == "exception": self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"]) elif msg.type in self.custom_messages: - self.custom_messages[msg.type](msg) + self.custom_messages[msg.type](environment=self.environment, msg=msg) self.check_stopped() @@ -841,7 +841,7 @@ def worker(self): self.greenlet.kill(block=True) elif msg.type in self.custom_messages: logger.debug("Recieved {msg_type} message on from master") - self.custom_messages[msg.type](msg) + self.custom_messages[msg.type](environment=self.environment, msg=msg) def stats_reporter(self): while True: diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index c9787abd9f..14d15b7700 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -1357,7 +1357,7 @@ def on_custom_msg(msg, **kw): test_custom_msg_data[0] = msg.data master = self.get_runner() - master.add_message("test_custom_msg", on_custom_msg) + master.register_message("test_custom_msg", on_custom_msg) server.mocked_send( Message("test_custom_msg", {'test_data': 123}, "dummy_id") @@ -1548,7 +1548,7 @@ def on_custom_msg(msg, **kw): test_custom_msg_data[0] = msg.data worker = self.get_runner(environment=environment, user_classes=[MyUser]) - worker.add_message("test_custom_msg", on_custom_msg) + worker.register_message("test_custom_msg", on_custom_msg) client.mocked_send( Message("test_custom_msg", {'test_data': 123}, "dummy_client_id") From 3099dbbd7dc5a8aefa5b2f4a1a44f978611e3564 Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Mon, 7 Jun 2021 13:22:51 -0400 Subject: [PATCH 04/11] Docs --- docs/api.rst | 3 ++- docs/running-locust-distributed.rst | 38 +++++++++++++++++++++++++++++ examples/custom_messages.py | 37 ++++++++++++++++++++++++++++ locust/runners.py | 18 ++++++++++++++ 4 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 examples/custom_messages.py diff --git a/docs/api.rst b/docs/api.rst index 6dc0fce99c..0f4a4ee64a 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -130,9 +130,10 @@ Runner classes .. autoclass:: locust.runners.LocalRunner .. autoclass:: locust.runners.MasterRunner + :members: register_message, send_message .. autoclass:: locust.runners.WorkerRunner - + :members: register_message, send_message Web UI class ============ diff --git a/docs/running-locust-distributed.rst b/docs/running-locust-distributed.rst index 716c3ead5b..d230556652 100644 --- a/docs/running-locust-distributed.rst +++ b/docs/running-locust-distributed.rst @@ -87,6 +87,44 @@ listen to. Defaults to 5557. Used when starting the master node with ``--headless``. The master node will then wait until X worker nodes has connected before the test is started. +Communicating across nodes +============================================= + +When running Locust in distributed mode, you may want to communicate between nodes in order to coordinate +data. This can be easily accomplished with custom messages using the built in messaging hooks: + +.. code-block:: python + + from locust import events + from locust.runners import MasterRunner, WorkerRunner + + # Fired when the worker recieves a message of type 'test_users' + def setup_test_users(environment, msg, **kwargs): + for user in msg.data: + print(f"User {user['name']} recieved") + environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!") + + # Fired when the master recieves a message of type 'acknowledge_users' + def on_acknowledge(msg, **kwargs): + print(msg.data) + + @events.init.add_listener + def on_locust_init(environment, **_kwargs): + if isinstance(environment.runner, WorkerRunner): + environment.runner.register_message('test_users', setup_test_users) + elif isinstance(environment.runner, MasterRunner): + environment.runner.register_message('acknowledge_users', on_acknowledge) + + @events.test_start.add_listener + def on_test_start(environment, **_kwargs): + if isinstance(environment.runner, MasterRunner): + users = [ + {"name": "User1"}, + {"name": "User2"}, + {"name": "User3"}, + ] + environment.runner.send_message('test_users', users) + Running distributed with Docker ============================================= diff --git a/examples/custom_messages.py b/examples/custom_messages.py new file mode 100644 index 0000000000..7d8e15f784 --- /dev/null +++ b/examples/custom_messages.py @@ -0,0 +1,37 @@ +from locust import HttpUser, task, events, between +from locust.runners import MasterRunner, WorkerRunner + +# Fired when the worker recieves a message of type 'test_users' +def setup_test_users(environment, msg, **kwargs): + for user in msg.data: + print(f"User {user['name']} recieved") + environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!") + +# Fired when the master recieves a message of type 'acknowledge_users' +def on_acknowledge(msg, **kwargs): + print(msg.data) + +@events.init.add_listener +def on_locust_init(environment, **_kwargs): + if isinstance(environment.runner, WorkerRunner): + environment.runner.register_message('test_users', setup_test_users) + elif isinstance(environment.runner, MasterRunner): + environment.runner.register_message('acknowledge_users', on_acknowledge) + +@events.test_start.add_listener +def on_test_start(environment, **_kwargs): + if isinstance(environment.runner, MasterRunner): + users = [ + {"name": "User1"}, + {"name": "User2"}, + {"name": "User3"}, + ] + environment.runner.send_message('test_users', users) + +class WebsiteUser(HttpUser): + host = "http://127.0.0.1:8089" + wait_time = between(2, 5) + + @task + def task(self): + pass diff --git a/locust/runners.py b/locust/runners.py index 9eab773bd4..95943ba5fe 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -433,6 +433,12 @@ def __init__(self, *args, **kwargs): setup_distributed_stats_event_listeners(self.environment.events, self.stats) def register_message(self, msg_type, listener): + """ + Register a listener for a custom message from another node + + :param msg_type: The type of the message to listen for + :param listener: The function to execute when the message is received + """ self.custom_messages[msg_type] = listener class WorkerNode: @@ -722,6 +728,12 @@ def worker_count(self): return len(self.clients.ready) + len(self.clients.spawning) + len(self.clients.running) def send_message(self, msg_type, data=None): + """ + Sends a message to all attached worker nodes + + :param msg_type: The type of the message to send + :param data: Optional data to send + """ for client in self.clients.all: logger.debug("Sending {msg_type} message to client {client_id}") self.server.send_to_client(Message(msg_type, data, client.id)) @@ -852,6 +864,12 @@ def stats_reporter(self): gevent.sleep(WORKER_REPORT_INTERVAL) def send_message(self, msg_type, data=None): + """ + Sends a message to master node + + :param msg_type: The type of the message to send + :param data: Optional data to send + """ logger.debug("Sending {msg_type} message to master") self.client.send(Message(msg_type, data, self.client_id)) From 4da9a283bfef802c56bd2e11fe1b6cf0cc908187 Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Mon, 7 Jun 2021 14:37:07 -0400 Subject: [PATCH 05/11] Fix f string --- locust/runners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/locust/runners.py b/locust/runners.py index 95943ba5fe..abc2cab4d0 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -735,7 +735,7 @@ def send_message(self, msg_type, data=None): :param data: Optional data to send """ for client in self.clients.all: - logger.debug("Sending {msg_type} message to client {client_id}") + logger.debug(f"Sending {msg_type} message to client {client.id}") self.server.send_to_client(Message(msg_type, data, client.id)) From 508a80acdd4e84725f500c889957a1bdccce7b8a Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Mon, 7 Jun 2021 16:00:47 -0400 Subject: [PATCH 06/11] Update example and fix linter errors --- examples/custom_messages.py | 47 +++++++++++++++++++++++++++---------- locust/runners.py | 18 +++++++++----- locust/test/test_runners.py | 15 +++++++----- 3 files changed, 56 insertions(+), 24 deletions(-) diff --git a/examples/custom_messages.py b/examples/custom_messages.py index 7d8e15f784..fbf1cd5e00 100644 --- a/examples/custom_messages.py +++ b/examples/custom_messages.py @@ -1,16 +1,20 @@ from locust import HttpUser, task, events, between from locust.runners import MasterRunner, WorkerRunner -# Fired when the worker recieves a message of type 'test_users' +usernames = [] + + def setup_test_users(environment, msg, **kwargs): - for user in msg.data: - print(f"User {user['name']} recieved") + # Fired when the worker recieves a message of type 'test_users' + usernames.extend(map(lambda u: u['name'], msg.data)) environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!") -# Fired when the master recieves a message of type 'acknowledge_users' + def on_acknowledge(msg, **kwargs): + # Fired when the master recieves a message of type 'acknowledge_users' print(msg.data) + @events.init.add_listener def on_locust_init(environment, **_kwargs): if isinstance(environment.runner, WorkerRunner): @@ -18,20 +22,39 @@ def on_locust_init(environment, **_kwargs): elif isinstance(environment.runner, MasterRunner): environment.runner.register_message('acknowledge_users', on_acknowledge) + @events.test_start.add_listener def on_test_start(environment, **_kwargs): + # When the test is started, evenly divides list between + # worker nodes to ensure unique data across threads if isinstance(environment.runner, MasterRunner): - users = [ - {"name": "User1"}, - {"name": "User2"}, - {"name": "User3"}, - ] - environment.runner.send_message('test_users', users) + users = [] + for i in range(environment.runner.target_user_count): + users.append({"name": f"User{i}"}) + + worker_count = environment.runner.worker_count + chunk_size = int(len(users) / worker_count) + + for i, worker in enumerate(environment.runner.clients): + start_index = i * chunk_size + + if i + 1 < worker_count: + end_index = start_index + chunk_size + else: + end_index = len(users) + + data = users[start_index:end_index] + environment.runner.send_message('test_users', data, worker) + class WebsiteUser(HttpUser): host = "http://127.0.0.1:8089" wait_time = between(2, 5) - + + def __init__(self, parent): + self.username = usernames.pop() + super(WebsiteUser, self).__init__(parent) + @task def task(self): - pass + print(self.username) diff --git a/locust/runners.py b/locust/runners.py index abc2cab4d0..5d9eb84de1 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -441,6 +441,7 @@ def register_message(self, msg_type, listener): """ self.custom_messages[msg_type] = listener + class WorkerNode: def __init__(self, id, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVENESS): self.id = id @@ -727,17 +728,22 @@ def client_listener(self): def worker_count(self): return len(self.clients.ready) + len(self.clients.spawning) + len(self.clients.running) - def send_message(self, msg_type, data=None): + def send_message(self, msg_type, data=None, client_id=None): """ - Sends a message to all attached worker nodes + Sends a message to attached worker node(s) :param msg_type: The type of the message to send :param data: Optional data to send + :param client_id: Optional id of the target worker node. + If None, will send to all attached workers """ - for client in self.clients.all: - logger.debug(f"Sending {msg_type} message to client {client.id}") - self.server.send_to_client(Message(msg_type, data, client.id)) - + if client_id: + logger.debug(f"Sending {msg_type} message to client {client_id}") + self.server.send_to_client(Message(msg_type, data, client_id)) + else: + for client in self.clients.all: + logger.debug(f"Sending {msg_type} message to client {client.id}") + self.server.send_to_client(Message(msg_type, data, client.id)) class WorkerRunner(DistributedRunner): diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 14d15b7700..d3c8060d42 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -1338,7 +1338,7 @@ def my_task(self): self.assertEqual(5, len(server.outbox)) for _, msg in server.outbox: self.assertEqual("test_custom_msg", msg.type) - self.assertEqual(123, msg.data['test_data']) + self.assertEqual(123, msg.data['test_data']) def test_custom_message_receive(self): class MyUser(User): @@ -1355,7 +1355,7 @@ def my_task(self): def on_custom_msg(msg, **kw): test_custom_msg[0] = True test_custom_msg_data[0] = msg.data - + master = self.get_runner() master.register_message("test_custom_msg", on_custom_msg) @@ -1364,7 +1364,9 @@ def on_custom_msg(msg, **kw): ) self.assertTrue(test_custom_msg[0]) - self.assertEqual(123, test_custom_msg_data[0]['test_data']) + self.assertEqual(123, test_custom_msg_data[0]['test_data']) + + class TestWorkerRunner(LocustTestCase): def setUp(self): super().setUp() @@ -1527,7 +1529,7 @@ def my_task(self): client.outbox.clear() worker.send_message('test_custom_msg', {'test_data': 123}) self.assertEqual("test_custom_msg", client.outbox[0].type) - self.assertEqual(123, client.outbox[0].data['test_data']) + self.assertEqual(123, client.outbox[0].data['test_data']) worker.quit() def test_custom_message_receive(self): @@ -1546,7 +1548,7 @@ def my_task(self): def on_custom_msg(msg, **kw): test_custom_msg[0] = True test_custom_msg_data[0] = msg.data - + worker = self.get_runner(environment=environment, user_classes=[MyUser]) worker.register_message("test_custom_msg", on_custom_msg) @@ -1555,9 +1557,10 @@ def on_custom_msg(msg, **kw): ) self.assertTrue(test_custom_msg[0]) - self.assertEqual(123, test_custom_msg_data[0]['test_data']) + self.assertEqual(123, test_custom_msg_data[0]['test_data']) worker.quit() + class TestMessageSerializing(unittest.TestCase): def test_message_serialize(self): msg = Message("client_ready", None, "my_id") From 0361694d34eeb4e03f05aebeaaf6a50f8e5790db Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Tue, 8 Jun 2021 10:31:15 -0400 Subject: [PATCH 07/11] Fixed formatting errors for custom messages code --- examples/custom_messages.py | 10 +++++----- locust/test/test_runners.py | 18 +++++++----------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/examples/custom_messages.py b/examples/custom_messages.py index fbf1cd5e00..35c300d4cd 100644 --- a/examples/custom_messages.py +++ b/examples/custom_messages.py @@ -6,8 +6,8 @@ def setup_test_users(environment, msg, **kwargs): # Fired when the worker recieves a message of type 'test_users' - usernames.extend(map(lambda u: u['name'], msg.data)) - environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!") + usernames.extend(map(lambda u: u["name"], msg.data)) + environment.runner.send_message("acknowledge_users", f"Thanks for the {len(msg.data)} users!") def on_acknowledge(msg, **kwargs): @@ -18,9 +18,9 @@ def on_acknowledge(msg, **kwargs): @events.init.add_listener def on_locust_init(environment, **_kwargs): if isinstance(environment.runner, WorkerRunner): - environment.runner.register_message('test_users', setup_test_users) + environment.runner.register_message("test_users", setup_test_users) elif isinstance(environment.runner, MasterRunner): - environment.runner.register_message('acknowledge_users', on_acknowledge) + environment.runner.register_message("acknowledge_users", on_acknowledge) @events.test_start.add_listener @@ -44,7 +44,7 @@ def on_test_start(environment, **_kwargs): end_index = len(users) data = users[start_index:end_index] - environment.runner.send_message('test_users', data, worker) + environment.runner.send_message("test_users", data, worker) class WebsiteUser(HttpUser): diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index d3c8060d42..80f2ad3e42 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -1338,7 +1338,7 @@ def my_task(self): self.assertEqual(5, len(server.outbox)) for _, msg in server.outbox: self.assertEqual("test_custom_msg", msg.type) - self.assertEqual(123, msg.data['test_data']) + self.assertEqual(123, msg.data["test_data"]) def test_custom_message_receive(self): class MyUser(User): @@ -1359,12 +1359,10 @@ def on_custom_msg(msg, **kw): master = self.get_runner() master.register_message("test_custom_msg", on_custom_msg) - server.mocked_send( - Message("test_custom_msg", {'test_data': 123}, "dummy_id") - ) + server.mocked_send(Message("test_custom_msg", {"test_data": 123}, "dummy_id")) self.assertTrue(test_custom_msg[0]) - self.assertEqual(123, test_custom_msg_data[0]['test_data']) + self.assertEqual(123, test_custom_msg_data[0]["test_data"]) class TestWorkerRunner(LocustTestCase): @@ -1527,9 +1525,9 @@ def my_task(self): environment = Environment() worker = self.get_runner(environment=environment, user_classes=[MyUser]) client.outbox.clear() - worker.send_message('test_custom_msg', {'test_data': 123}) + worker.send_message("test_custom_msg", {"test_data": 123}) self.assertEqual("test_custom_msg", client.outbox[0].type) - self.assertEqual(123, client.outbox[0].data['test_data']) + self.assertEqual(123, client.outbox[0].data["test_data"]) worker.quit() def test_custom_message_receive(self): @@ -1552,12 +1550,10 @@ def on_custom_msg(msg, **kw): worker = self.get_runner(environment=environment, user_classes=[MyUser]) worker.register_message("test_custom_msg", on_custom_msg) - client.mocked_send( - Message("test_custom_msg", {'test_data': 123}, "dummy_client_id") - ) + client.mocked_send(Message("test_custom_msg", {"test_data": 123}, "dummy_client_id")) self.assertTrue(test_custom_msg[0]) - self.assertEqual(123, test_custom_msg_data[0]['test_data']) + self.assertEqual(123, test_custom_msg_data[0]["test_data"]) worker.quit() From 3e4580c6d00968a7a553c854094c00cc7f3851dc Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Mon, 21 Jun 2021 16:11:28 -0400 Subject: [PATCH 08/11] Added custom message logging and tests --- locust/runners.py | 7 +++++- locust/test/test_runners.py | 50 +++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/locust/runners.py b/locust/runners.py index 5d9eb84de1..dd8eca1145 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -720,7 +720,10 @@ def client_listener(self): elif msg.type == "exception": self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"]) elif msg.type in self.custom_messages: + logger.debug(f"Recieved {msg.type} message from worker {msg.node_id}") self.custom_messages[msg.type](environment=self.environment, msg=msg) + else: + logger.warning(f"Unknown message type recieved from worker {msg.node_id}: {msg.type}") self.check_stopped() @@ -858,8 +861,10 @@ def worker(self): self._send_stats() # send a final report, in case there were any samples not yet reported self.greenlet.kill(block=True) elif msg.type in self.custom_messages: - logger.debug("Recieved {msg_type} message on from master") + logger.debug(f"Recieved {msg.type} message from master") self.custom_messages[msg.type](environment=self.environment, msg=msg) + else: + logger.warning(f"Unknown message type recieved: {msg.type}") def stats_reporter(self): while True: diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 80f2ad3e42..e150e8bec9 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -1364,6 +1364,30 @@ def on_custom_msg(msg, **kw): self.assertTrue(test_custom_msg[0]) self.assertEqual(123, test_custom_msg_data[0]["test_data"]) + def test_undefined_custom_message_receive(self): + class MyUser(User): + wait_time = constant(1) + + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + test_custom_msg = [False] + + def on_custom_msg(msg, **kw): + test_custom_msg[0] = True + + master = self.get_runner() + master.register_message("test_custom_msg", on_custom_msg) + + server.mocked_send(Message("unregistered_custom_msg", {}, "dummy_id")) + + self.assertFalse(test_custom_msg[0]) + self.assertEqual(1, len(self.mocked_log.warning)) + msg = self.mocked_log.warning[0] + self.assertIn("Unknown message type recieved from worker", msg) + class TestWorkerRunner(LocustTestCase): def setUp(self): @@ -1556,6 +1580,32 @@ def on_custom_msg(msg, **kw): self.assertEqual(123, test_custom_msg_data[0]["test_data"]) worker.quit() + def test_undefined_custom_message_receive(self): + class MyUser(User): + wait_time = constant(1) + + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client: + environment = Environment() + + test_custom_msg = [False] + + def on_custom_msg(msg, **kw): + test_custom_msg[0] = True + + worker = self.get_runner(environment=environment, user_classes=[MyUser]) + worker.register_message("test_custom_msg", on_custom_msg) + + client.mocked_send(Message("unregistered_custom_msg", {}, "dummy_id")) + + self.assertFalse(test_custom_msg[0]) + self.assertEqual(1, len(self.mocked_log.warning)) + msg = self.mocked_log.warning[0] + self.assertIn("Unknown message type recieved", msg) + class TestMessageSerializing(unittest.TestCase): def test_message_serialize(self): From 8ef99724a2cc5e1a667e4bc44b4a511da48fe45e Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Tue, 22 Jun 2021 21:21:44 -0400 Subject: [PATCH 09/11] Implement custom message emulation on LocalRunner --- locust/runners.py | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index dd8eca1145..8592dbbe9d 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -69,6 +69,7 @@ def __init__(self, environment): self.greenlet.spawn(self.monitor_cpu).link_exception(greenlet_exception_handler) self.exceptions = {} self.target_user_count = None + self.custom_messages = {} # set up event listeners for recording requests def on_request_success(request_type, name, response_time, response_length, **_kwargs): @@ -381,6 +382,15 @@ def log_exception(self, node_id, msg, formatted_tb): row["nodes"].add(node_id) self.exceptions[key] = row + def register_message(self, msg_type, listener): + """ + Register a listener for a custom message from another node + + :param msg_type: The type of the message to listen for + :param listener: The function to execute when the message is received + """ + self.custom_messages[msg_type] = listener + class LocalRunner(Runner): """ @@ -425,22 +435,27 @@ def stop(self): super().stop() self.environment.events.test_stop.fire(environment=self.environment) + def send_message(self, msg_type, data=None): + """ + Emulates internodal messaging by calling registered listeners + + :param msg_type: The type of the message to emulate sending + :param data: Optional data to include + """ + logger.debug(f"Running locally: sending {msg_type} message to self") + if msg_type in self.custom_messages: + listener = self.custom_messages[msg_type] + msg = Message(msg_type, data, "local") + listener(environment=self.environment, msg=msg) + else: + logger.warning(f"Unknown message type recieved: {msg_type}") + class DistributedRunner(Runner): def __init__(self, *args, **kwargs): - self.custom_messages = {} super().__init__(*args, **kwargs) setup_distributed_stats_event_listeners(self.environment.events, self.stats) - def register_message(self, msg_type, listener): - """ - Register a listener for a custom message from another node - - :param msg_type: The type of the message to listen for - :param listener: The function to execute when the message is received - """ - self.custom_messages[msg_type] = listener - class WorkerNode: def __init__(self, id, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVENESS): @@ -881,7 +896,7 @@ def send_message(self, msg_type, data=None): :param msg_type: The type of the message to send :param data: Optional data to send """ - logger.debug("Sending {msg_type} message to master") + logger.debug(f"Sending {msg_type} message to master") self.client.send(Message(msg_type, data, self.client_id)) def _send_stats(self): From c5f1c687bcdf7bc4e11c2f737a9ff18598cd782b Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Tue, 22 Jun 2021 21:28:18 -0400 Subject: [PATCH 10/11] Added tests for local custom messages --- locust/test/test_runners.py | 48 +++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index e150e8bec9..4d95377a83 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -438,6 +438,54 @@ def my_task(self): user_count = len(runner.user_greenlets) self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count) + def test_custom_message(self): + class MyUser(User): + wait_time = constant(1) + + @task + def my_task(self): + pass + + test_custom_msg = [False] + test_custom_msg_data = [{}] + + def on_custom_msg(msg, **kw): + test_custom_msg[0] = True + test_custom_msg_data[0] = msg.data + + environment = Environment(user_classes=[MyUser]) + runner = LocalRunner(environment) + + runner.register_message("test_custom_msg", on_custom_msg) + runner.send_message("test_custom_msg", {"test_data": 123}) + + self.assertTrue(test_custom_msg[0]) + self.assertEqual(123, test_custom_msg_data[0]["test_data"]) + + def test_undefined_custom_message(self): + class MyUser(User): + wait_time = constant(1) + + @task + def my_task(self): + pass + + test_custom_msg = [False] + + def on_custom_msg(msg, **kw): + test_custom_msg[0] = True + + environment = Environment(user_classes=[MyUser]) + runner = LocalRunner(environment) + + runner.register_message("test_custom_msg", on_custom_msg) + runner.send_message("test_different_custom_msg") + + self.assertFalse(test_custom_msg[0]) + self.assertEqual(1, len(self.mocked_log.warning)) + msg = self.mocked_log.warning[0] + self.assertIn("Unknown message type recieved", msg) + class TestMasterWorkerRunners(LocustTestCase): def test_distributed_integration_run(self): From a6c9b1bf7c3e690bf4856cf41be82cec476d2c69 Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Wed, 23 Jun 2021 11:50:08 -0400 Subject: [PATCH 11/11] Updated documentation for custom messages --- docs/running-locust-distributed.rst | 16 +++++++++++----- examples/custom_messages.py | 6 +++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/docs/running-locust-distributed.rst b/docs/running-locust-distributed.rst index d230556652..502aa15447 100644 --- a/docs/running-locust-distributed.rst +++ b/docs/running-locust-distributed.rst @@ -90,8 +90,8 @@ nodes has connected before the test is started. Communicating across nodes ============================================= -When running Locust in distributed mode, you may want to communicate between nodes in order to coordinate -data. This can be easily accomplished with custom messages using the built in messaging hooks: +When running Locust in distributed mode, you may want to communicate between master and worker nodes in +order to coordinate data. This can be easily accomplished with custom messages using the built in messaging hooks: .. code-block:: python @@ -110,14 +110,14 @@ data. This can be easily accomplished with custom messages using the built in me @events.init.add_listener def on_locust_init(environment, **_kwargs): - if isinstance(environment.runner, WorkerRunner): + if not isinstance(environment.runner, MasterRunner): environment.runner.register_message('test_users', setup_test_users) - elif isinstance(environment.runner, MasterRunner): + if not isinstance(environment.runner, WorkerRunner): environment.runner.register_message('acknowledge_users', on_acknowledge) @events.test_start.add_listener def on_test_start(environment, **_kwargs): - if isinstance(environment.runner, MasterRunner): + if not isinstance(environment.runner, MasterRunner): users = [ {"name": "User1"}, {"name": "User2"}, @@ -125,6 +125,12 @@ data. This can be easily accomplished with custom messages using the built in me ] environment.runner.send_message('test_users', users) +Note that when running locally (i.e. non-distributed), this functionality will be preserved; +the messages will simply be handled by the same runner that sends them. + +A more complete example can be found in the `examples directory `_ of the Locust +source code. + Running distributed with Docker ============================================= diff --git a/examples/custom_messages.py b/examples/custom_messages.py index 35c300d4cd..3b12124781 100644 --- a/examples/custom_messages.py +++ b/examples/custom_messages.py @@ -17,9 +17,9 @@ def on_acknowledge(msg, **kwargs): @events.init.add_listener def on_locust_init(environment, **_kwargs): - if isinstance(environment.runner, WorkerRunner): + if not isinstance(environment.runner, MasterRunner): environment.runner.register_message("test_users", setup_test_users) - elif isinstance(environment.runner, MasterRunner): + if not isinstance(environment.runner, WorkerRunner): environment.runner.register_message("acknowledge_users", on_acknowledge) @@ -27,7 +27,7 @@ def on_locust_init(environment, **_kwargs): def on_test_start(environment, **_kwargs): # When the test is started, evenly divides list between # worker nodes to ensure unique data across threads - if isinstance(environment.runner, MasterRunner): + if not isinstance(environment.runner, WorkerRunner): users = [] for i in range(environment.runner.target_user_count): users.append({"name": f"User{i}"})