diff --git a/docs/api.rst b/docs/api.rst index 423b1e8811..f4044dbc22 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -133,7 +133,7 @@ Runner classes :members: register_message, send_message .. autoclass:: locust.runners.WorkerRunner - :members: register_message, send_message + :members: register_message, send_message, client_id, worker_index Web UI class ============ diff --git a/locust/runners.py b/locust/runners.py index 9cfef4afbb..bdcebb4433 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -443,6 +443,9 @@ class LocalRunner(Runner): Runner for running single process load test """ + # always set to 0 for LocalRunner + worker_index = 0 + def __init__(self, environment) -> None: """ :param environment: Environment instance @@ -659,6 +662,8 @@ def __init__(self, environment, master_bind_host, master_bind_port): self.master_bind_port = master_bind_port self.spawn_rate: float = 0.0 self.spawning_completed = False + self.worker_indexes: dict[str, int] = {} + self.worker_index_max = 0 self.clients = WorkerNodes() try: @@ -700,6 +705,19 @@ def rebalancing_enabled(self) -> bool: bool, self.environment.parsed_options.enable_rebalancing ) + def get_worker_index(self, client_id): + """ + Get the worker index for the specified client ID; + this is a deterministic 0-based ordinal number and guaranteed to not change + while Master is alive. + """ + if client_id in self.worker_indexes: + return self.worker_indexes[client_id] + index = self.worker_index_max + self.worker_indexes[client_id] = index + self.worker_index_max += 1 + return index + @property def user_count(self) -> int: return sum([c.user_count for c in self.clients.values()]) @@ -999,7 +1017,7 @@ def client_listener(self) -> NoReturn: logger.warning( f"A worker ({client_id}) running a different version ({msg.data}) connected, master version is {__version__}" ) - self.send_message("ack", client_id=client_id) + self.send_message("ack", client_id=client_id, data={"index": self.get_worker_index(client_id)}) worker_node_id = msg.node_id self.clients[worker_node_id] = WorkerNode(worker_node_id, heartbeat_liveness=HEARTBEAT_LIVENESS) if self._users_dispatcher is not None: @@ -1121,6 +1139,9 @@ class WorkerRunner(DistributedRunner): take the stats generated by the running users and send back to the :class:`MasterRunner`. """ + # the worker index is set on ACK, if master provided it (masters <= 2.10.2 do not provide it) + worker_index = -1 + def __init__(self, environment: "Environment", master_host: str, master_port: int) -> None: """ :param environment: Environment instance @@ -1245,6 +1266,9 @@ def worker(self) -> NoReturn: logger.error(f"RPCError found when receiving from master: {e}") continue if msg.type == "ack": + # backward-compatible support of masters that do not send a worker index + if msg.data is not None and "index" in msg.data: + self.worker_index = msg.data["index"] self.connection_event.set() elif msg.type == "spawn": self.client.send(Message("spawning", None, self.client_id)) diff --git a/locust/test/test_main.py b/locust/test/test_main.py index 0c47f838a4..469f19163e 100644 --- a/locust/test/test_main.py +++ b/locust/test/test_main.py @@ -1325,3 +1325,91 @@ def t(self): self.assertEqual(0, proc.returncode) self.assertEqual(0, proc_worker.returncode) + + def test_worker_indexes(self): + content = ( + MOCK_LOCUSTFILE_CONTENT + + """ +class AnyUser(HttpUser): + host = "http://127.0.0.1:8089" + wait_time = between(0, 0.1) + @task + def my_task(self): + print("worker index:", self.environment.runner.worker_index) +""" + ) + with mock_locustfile(content=content) as mocked: + master = subprocess.Popen( + [ + "locust", + "-f", + mocked.file_path, + "--headless", + "--master", + "--expect-workers", + "2", + "-t", + "5", + "-u", + "2", + "-L", + "DEBUG", + ], + stdout=PIPE, + stderr=PIPE, + text=True, + ) + proc_worker_1 = subprocess.Popen( + [ + "locust", + "-f", + mocked.file_path, + "--worker", + "-L", + "DEBUG", + ], + stdout=PIPE, + stderr=PIPE, + text=True, + ) + proc_worker_2 = subprocess.Popen( + [ + "locust", + "-f", + mocked.file_path, + "--worker", + "-L", + "DEBUG", + ], + stdout=PIPE, + stderr=PIPE, + text=True, + ) + stdout, stderr = master.communicate() + self.assertNotIn("Traceback", stderr) + self.assertEqual(0, master.returncode) + + stdout_worker_1, stderr_worker_1 = proc_worker_1.communicate() + stdout_worker_2, stderr_worker_2 = proc_worker_2.communicate() + self.assertEqual(0, proc_worker_1.returncode) + self.assertEqual(0, proc_worker_2.returncode) + self.assertNotIn("Traceback", stderr_worker_1) + self.assertNotIn("Traceback", stderr_worker_2) + + PREFIX = "worker index: " + p1 = stdout_worker_1.find(PREFIX) + if p1 == -1: + raise Exception(stdout_worker_1 + stderr_worker_1) + self.assertNotEqual(-1, p1) + p2 = stdout_worker_2.find(PREFIX) + if p2 == -1: + raise Exception(stdout_worker_2 + stderr_worker_2) + self.assertNotEqual(-1, p2) + found = [ + int(stdout_worker_1[p1 + len(PREFIX) :].split("\n")[0]), + int(stdout_worker_2[p1 + len(PREFIX) :].split("\n")[0]), + ] + found.sort() + for i in range(2): + if found[i] != i: + raise Exception(f"expected index {i} but got", found[i]) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 460cd75627..aef75fddaa 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -2635,7 +2635,9 @@ def my_task(self): master.start(7, 7) self.assertEqual(10, len(server.outbox)) - num_users = sum(sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.data) + num_users = sum( + sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.type != "ack" + ) self.assertEqual(7, num_users, "Total number of locusts that would have been spawned is not 7") @@ -2654,10 +2656,43 @@ def my_task(self): master.start(2, 2) self.assertEqual(10, len(server.outbox)) - num_users = sum(sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.data) + num_users = sum( + sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.type != "ack" + ) self.assertEqual(2, num_users, "Total number of locusts that would have been spawned is not 2") + def test_spawn_correct_worker_indexes(self): + """ + Tests that workers would receive a monotonic sequence of ordinal IDs. + """ + + class TestUser(User): + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + master = self.get_runner(user_classes=[TestUser]) + + USERS_COUNT = 5 + + for i in range(USERS_COUNT): + server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i)) + + master.start(USERS_COUNT, USERS_COUNT) + self.assertEqual(USERS_COUNT * 2, len(server.outbox)) + + indexes = [] + for _, msg in server.outbox: + if msg.type == "ack": + indexes.append(msg.data["index"]) + self.assertEqual(USERS_COUNT, len(indexes), "Total number of locusts/workers is not 5") + + indexes.sort() + for i in range(USERS_COUNT): + self.assertEqual(indexes[i], i, "Worker index mismatch") + def test_custom_shape_scale_up(self): class MyUser(User): @task @@ -2687,14 +2722,18 @@ def tick(self): sleep(0.5) # Wait for shape_worker to update user_count - num_users = sum(sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.data) + num_users = sum( + sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.type != "ack" + ) self.assertEqual( 1, num_users, "Total number of users in first stage of shape test is not 1: %i" % num_users ) # Wait for shape_worker to update user_count again sleep(2) - num_users = sum(sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.data) + num_users = sum( + sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.type != "ack" + ) self.assertEqual( 3, num_users, "Total number of users in second stage of shape test is not 3: %i" % num_users ) @@ -2732,7 +2771,9 @@ def tick(self): sleep(0.5) # Wait for shape_worker to update user_count - num_users = sum(sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.data) + num_users = sum( + sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.type != "ack" + ) self.assertEqual( 5, num_users, "Total number of users in first stage of shape test is not 5: %i" % num_users ) @@ -2741,7 +2782,7 @@ def tick(self): sleep(2) msgs = defaultdict(dict) for _, msg in server.outbox: - if not msg.data: + if msg.type == "ack": continue msgs[msg.node_id][msg.data["timestamp"]] = sum(msg.data["user_classes_count"].values()) # Count users for the last received messages @@ -2977,6 +3018,7 @@ def test_master_discard_first_client_ready(self): self.assertEqual(1, len(master.clients)) self.assertEqual("ack", server.outbox[0][1].type) self.assertEqual(1, len(server.outbox)) + self.assertEqual(0, server.outbox[0][1].data["index"]) def test_worker_sends_bad_message_to_master(self): """ @@ -3018,7 +3060,7 @@ def tearDown(self): def get_runner(self, client, environment=None, user_classes=None, auto_connect=True): if auto_connect: - client.mocked_send(Message("ack", {}, "dummy_client_id")) + client.mocked_send(Message("ack", {"index": 0}, "dummy_client_id")) if environment is None: environment = self.environment user_classes = user_classes or []