From 01162ac90ca22c5f0abaff11819addc67b9e9bdf Mon Sep 17 00:00:00 2001 From: Maxence Boutet Date: Tue, 27 Jul 2021 20:32:35 -0400 Subject: [PATCH 01/10] Address some of the comments from PR #1809 --- locust/dispatch.py | 1 - locust/runners.py | 10 ++-------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/locust/dispatch.py b/locust/dispatch.py index 9b4ec019a7..a188ae9dd0 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -236,7 +236,6 @@ def _ramp_down(self) -> Dict[str, Dict[str, int]]: if self._current_user_count == 0 or self._current_user_count <= current_user_count_target: return self._users_on_workers - # TODO: Test this def _distribute_users( self, target_user_count: int ) -> Tuple[dict, Generator[str, None, None], typing.Iterator["WorkerNode"], List[Tuple["WorkerNode", str]]]: diff --git a/locust/runners.py b/locust/runners.py index c8fc297299..617d39fe71 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -314,7 +314,7 @@ def start(self, user_count: int, spawn_rate: float, wait: bool = False): worker_nodes=[self._local_worker_node], user_classes=self.user_classes ) - logger.info("Ramping to %d users using a %.2f spawn rate" % (user_count, spawn_rate)) + logger.info("Ramping to %d users at a rate of %g per second" % (user_count, spawn_rate)) self._users_dispatcher.new_dispatch(user_count, spawn_rate) @@ -694,8 +694,6 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None: self._users_dispatcher.new_dispatch(target_user_count=user_count, spawn_rate=spawn_rate) try: - dispatched_users = None - for dispatched_users in self._users_dispatcher: dispatch_greenlets = Group() for worker_node_id, worker_user_classes_count in dispatched_users.items(): @@ -724,7 +722,6 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None: "Currently spawned users: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count) ) - assert dispatched_users is not None self.target_user_classes_count = _aggregate_dispatched_users(dispatched_users) except KeyboardInterrupt: @@ -771,12 +768,9 @@ def _wait_for_workers_report_after_ramp_up(self) -> float: if match is None: assert float(locust_wait_for_workers_report_after_ramp_up) >= 0 return float(locust_wait_for_workers_report_after_ramp_up) - - if match is not None: + else: return float(match.group("coeff")) * WORKER_REPORT_INTERVAL - assert False, "not supposed to reach that" - def stop(self, send_stop_to_client: bool = True): if self.state not in [STATE_INIT, STATE_STOPPED, STATE_STOPPING]: logger.debug("Stopping...") From a88fc198426d8c0b95f3c67efac0d87f8c3d3c29 Mon Sep 17 00:00:00 2001 From: Maxence Boutet Date: Tue, 27 Jul 2021 20:34:56 -0400 Subject: [PATCH 02/10] Update test for new log message --- locust/test/test_main.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/locust/test/test_main.py b/locust/test/test_main.py index 74cef09552..9b40881854 100644 --- a/locust/test/test_main.py +++ b/locust/test/test_main.py @@ -380,13 +380,13 @@ def t(self): output = proc.communicate()[0].decode("utf-8") stdin.close() - self.assertIn("Ramping to 1 users using a 100.00 spawn rate", output) + self.assertIn("Ramping to 1 users at a rate of 100.00 per second", output) self.assertIn('All users spawned: {"UserSubclass": 1} (1 total users)', output) - self.assertIn("Ramping to 11 users using a 100.00 spawn rate", output) + self.assertIn("Ramping to 11 users at a rate of 100.00 per second", output) self.assertIn('All users spawned: {"UserSubclass": 11} (11 total users)', output) - self.assertIn("Ramping to 10 users using a 100.00 spawn rate", output) + self.assertIn("Ramping to 10 users at a rate of 100.00 per second", output) self.assertIn('All users spawned: {"UserSubclass": 10} (10 total users)', output) - self.assertIn("Ramping to 0 users using a 100.00 spawn rate", output) + self.assertIn("Ramping to 0 users at a rate of 100.00 per second", output) self.assertIn('All users spawned: {"UserSubclass": 0} (0 total users)', output) self.assertIn("Test task is running", output) self.assertIn("Shutting down (exit code 0), bye.", output) From 532aec98098180b98f9dd0fd76e8e22a23ac5fca Mon Sep 17 00:00:00 2001 From: Maxence Boutet Date: Tue, 27 Jul 2021 20:42:55 -0400 Subject: [PATCH 03/10] Add comment in benchmark file to clarify what it is --- benchmarks/dispatch.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/benchmarks/dispatch.py b/benchmarks/dispatch.py index 8c1cc00c7c..f88a4e9643 100644 --- a/benchmarks/dispatch.py +++ b/benchmarks/dispatch.py @@ -1,3 +1,10 @@ +""" +This file contains a benchmark to validate the performance of Locust itself. +More precisely, the performance of the `UsersDispatcher` class which is responsible +for calculating the distribution of users on each workers. This benchmark is to be used +by people working on Locust's development. +""" + import itertools import statistics import time From 94b12c7329a2341c2a983912d38572f26782e989 Mon Sep 17 00:00:00 2001 From: Maxence Boutet Date: Tue, 27 Jul 2021 20:44:55 -0400 Subject: [PATCH 04/10] Remove outdated warning about global spawn rate --- locust/runners.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index 617d39fe71..1c380dbf9a 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -670,18 +670,6 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None: "Your selected spawn rate is very high (>100/worker), and this is known to sometimes cause issues. Do you really need to ramp up that fast?" ) - # Since https://github.com/locustio/locust/pull/1621, the master is responsible for dispatching and controlling - # the total spawn rate which is more CPU intensive for the master. The number 200 is a little arbitrary as the computational - # load on the master greatly depends on the number of workers and the number of user classes. For instance, - # 5 user classes and 5 workers can easily do 200/s. However, 200/s with 50 workers and 20 user classes will likely make the - # dispatch very slow because of the required computations. I (@mboutet) doubt that many Locust's users are - # spawning that rapidly. If so, then they'll likely open issues on GitHub in which case I'll (@mboutet) take a look. - if spawn_rate > 200: - logger.warning( - "Your selected total spawn rate is quite high (>200), and this is known to sometimes cause performance issues on the master. " - "Do you really need to ramp up that fast? If so and if encountering performance issues on the master, free to open an issue." - ) - if self.state != STATE_RUNNING and self.state != STATE_SPAWNING: self.stats.clear_all() self.exceptions = {} From 00e8a44f67d5a039595eaf4c0eab181654d55d5e Mon Sep 17 00:00:00 2001 From: Maxence Boutet Date: Tue, 27 Jul 2021 20:54:51 -0400 Subject: [PATCH 05/10] Update log message to include decimals for spawn rate --- locust/runners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/locust/runners.py b/locust/runners.py index 1c380dbf9a..161d8f82a8 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -314,7 +314,7 @@ def start(self, user_count: int, spawn_rate: float, wait: bool = False): worker_nodes=[self._local_worker_node], user_classes=self.user_classes ) - logger.info("Ramping to %d users at a rate of %g per second" % (user_count, spawn_rate)) + logger.info("Ramping to %d users at a rate of %.2f per second" % (user_count, spawn_rate)) self._users_dispatcher.new_dispatch(user_count, spawn_rate) From 1a084a5358b383a01358c5a74016d94227fb7622 Mon Sep 17 00:00:00 2001 From: Maxence Boutet Date: Wed, 28 Jul 2021 10:58:35 -0400 Subject: [PATCH 06/10] Remove outdated TODO --- locust/dispatch.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/locust/dispatch.py b/locust/dispatch.py index a188ae9dd0..8bc3341d78 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -185,8 +185,6 @@ def _prepare_rebalance(self) -> None: self._worker_node_generator = worker_gen self._active_users = active_users - # TODO: What to do with self._initial_users_on_workers? - self._rebalance = True @contextlib.contextmanager From 77e3b0e22664ac5860d16cc294752ed8bdd82fb5 Mon Sep 17 00:00:00 2001 From: Maxence Boutet Date: Wed, 28 Jul 2021 11:00:52 -0400 Subject: [PATCH 07/10] Document `_user_gen` function --- locust/dispatch.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/locust/dispatch.py b/locust/dispatch.py index 8bc3341d78..36b9015f4f 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -264,13 +264,32 @@ def _distribute_users( return users_on_workers, user_gen, worker_gen, active_users def _user_gen(self) -> Generator[str, None, None]: - # TODO: Explain why we do round(10 * user_class.weight / min_weight) + """ + This method generates users according to their weights using + a smooth weighted round-robin algorithm implemented by https://github.com/linnik/roundrobin. + + For example, given users A, B with weights 5 and 1 respectively, this algorithm + will yield AAABAAAAABAA. The smooth aspect of this algorithm is what makes it possible + to keep the distribution during ramp-up and ramp-down. If we were to use a normal + weighted round-robin algorithm, we'd get AAAAABAAAAAB which would make the distribution + less accurate during ramp-up/down. + """ + # Normalize the weights so that the smallest weight will be equal to "target_min_weight". + # The value "2" was experimentally determined because it gave a better distribution especially + # when dealing with weights which are close to each others, e.g. 1.5, 2, 2.4, etc. + target_min_weight = 2 min_weight = min(u.weight for u in self._user_classes) normalized_weights = [ - (user_class.__name__, round(2 * user_class.weight / min_weight)) for user_class in self._user_classes + (user_class.__name__, round(target_min_weight * user_class.weight / min_weight)) + for user_class in self._user_classes ] gen = smooth(normalized_weights) - # TODO: Explain `generation_length_to_get_proper_distribution` + # Instead of calling `gen()` for each user, we cycle through a generator of fixed-length + # `generation_length_to_get_proper_distribution`. Doing so greatly improves performance because + # we only ever need to call `gen()` a relatively small number of times. The length of this generator + # is chosen as the sum of the normalized weights. So, for users A, B, C of weights 2, 5, 6, the length is + # 2 + 5 + 6 = 13 which would yield the distribution `CBACBCBCBCABC` that gets repeated over and over + # until the target user count is reached. generation_length_to_get_proper_distribution = sum( normalized_weight[1] for normalized_weight in normalized_weights ) From 9099184d5efdf7a2a998b339ba650f39c7a67f23 Mon Sep 17 00:00:00 2001 From: Maxence Boutet Date: Wed, 28 Jul 2021 14:01:15 -0400 Subject: [PATCH 08/10] Rename `_ramp_up` and `_ramp_down` methods --- locust/dispatch.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/locust/dispatch.py b/locust/dispatch.py index 36b9015f4f..356647c876 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -128,14 +128,14 @@ def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]: while self._current_user_count < self._target_user_count: with self._wait_between_dispatch_iteration_context(): - yield self._ramp_up() + yield self._add_users_on_workers() if self._rebalance: self._rebalance = False yield self._users_on_workers while self._current_user_count > self._target_user_count: with self._wait_between_dispatch_iteration_context(): - yield self._ramp_down() + yield self._remove_users_from_workers() if self._rebalance: self._rebalance = False yield self._users_on_workers @@ -208,7 +208,11 @@ def _wait_between_dispatch_iteration_context(self) -> None: sleep_duration = max(0.0, self._wait_between_dispatch - delta) gevent.sleep(sleep_duration) - def _ramp_up(self) -> Dict[str, Dict[str, int]]: + def _add_users_on_workers(self) -> Dict[str, Dict[str, int]]: + """Add users on the workers until the target number of users is reached for the current dispatch iteration + + :return: The users that we want to run on the workers + """ current_user_count_target = min( self._current_user_count + self._user_count_per_dispatch_iteration, self._target_user_count ) @@ -220,7 +224,11 @@ def _ramp_up(self) -> Dict[str, Dict[str, int]]: if self._current_user_count >= current_user_count_target: return self._users_on_workers - def _ramp_down(self) -> Dict[str, Dict[str, int]]: + def _remove_users_from_workers(self) -> Dict[str, Dict[str, int]]: + """Remove users from the workers until the target number of users is reached for the current dispatch iteration + + :return: The users that we want to run on the workers + """ current_user_count_target = max( self._current_user_count - self._user_count_per_dispatch_iteration, self._target_user_count ) From 06b420889f0655d885492cf21ff869dc4d0dc1a2 Mon Sep 17 00:00:00 2001 From: Maxence Boutet Date: Wed, 28 Jul 2021 14:25:14 -0400 Subject: [PATCH 09/10] Use `worker_node` in `remove_worker` for consistency I also added comments and docstrings to explain what's going on in some places. --- locust/dispatch.py | 31 ++++++++++++++++++++++++++++--- locust/runners.py | 8 +++++--- locust/test/test_dispatch.py | 18 +++++++++--------- 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/locust/dispatch.py b/locust/dispatch.py index 356647c876..dbd6aaa0ea 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -144,6 +144,8 @@ def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]: def new_dispatch(self, target_user_count: int, spawn_rate: float) -> None: """ + Initialize a new dispatch cycle. + :param target_user_count: The desired user count at the end of the dispatch cycle :param spawn_rate: The spawn rate """ @@ -166,24 +168,47 @@ def new_dispatch(self, target_user_count: int, spawn_rate: float) -> None: self._dispatch_iteration_durations.clear() def add_worker(self, worker_node: "WorkerNode") -> None: + """ + This method is to be called when a new worker connects to the master. When + a new worker is added, the users dispatcher will flag that a rebalance is required + and ensure that the next dispatch iteration will be made to redistribute the users + on the new pool of workers. + + :param worker_node: The worker node to add. + """ self._worker_nodes.append(worker_node) self._worker_nodes = sorted(self._worker_nodes, key=lambda w: w.id) self._prepare_rebalance() - def remove_worker(self, worker_node_id: str) -> None: - self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node_id] + def remove_worker(self, worker_node: "WorkerNode") -> None: + """ + This method is similar to the above `add_worker`. When a worker disconnects + (because of e.g. network failure, worker failure, etc.), this method will ensure that the next + dispatch iteration redistributes the users on the remaining workers. + + :param worker_node: The worker node to remove. + """ + self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node.id] if len(self._worker_nodes) == 0: # TODO: Test this return self._prepare_rebalance() def _prepare_rebalance(self) -> None: + """ + When a rebalance is required because of added and/or removed workers, we compute the desired state as if + we started from 0 user. So, if we were currently running 500 users, then the `_distribute_users` will + perform a fake ramp-up without any waiting and return the final distribution. + """ users_on_workers, user_gen, worker_gen, active_users = self._distribute_users(self._current_user_count) self._users_on_workers = users_on_workers + self._active_users = active_users + + # It's important to reset the generators by using the ones from `_distribute_users` + # so that the next iterations are smooth and continuous. self._user_generator = user_gen self._worker_node_generator = worker_gen - self._active_users = active_users self._rebalance = True diff --git a/locust/runners.py b/locust/runners.py index 161d8f82a8..7e260ee5cb 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -818,7 +818,7 @@ def heartbeat_worker(self): client.state = STATE_MISSING client.user_classes_count = {} if self._users_dispatcher is not None: - self._users_dispatcher.remove_worker(client.id) + self._users_dispatcher.remove_worker(client) # TODO: If status is `STATE_RUNNING`, call self.start() if self.worker_count <= 0: logger.info("The last worker went missing, stopping test.") @@ -873,9 +873,10 @@ def client_listener(self): # if abs(time() - msg.data["time"]) > 5.0: # warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.") elif msg.type == "client_stopped": + client = self.clients[msg.node_id] del self.clients[msg.node_id] if self._users_dispatcher is not None: - self._users_dispatcher.remove_worker(msg.node_id) + self._users_dispatcher.remove_worker(client) if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING: # TODO: Test this situation self.start(self.target_user_count, self.spawn_rate) @@ -914,9 +915,10 @@ def client_listener(self): self.clients[msg.node_id].user_classes_count = msg.data["user_classes_count"] elif msg.type == "quit": if msg.node_id in self.clients: + client = self.clients[msg.node_id] del self.clients[msg.node_id] if self._users_dispatcher is not None: - self._users_dispatcher.remove_worker(msg.node_id) + self._users_dispatcher.remove_worker(client) if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING: # TODO: Test this situation self.start(self.target_user_count, self.spawn_rate) diff --git a/locust/test/test_dispatch.py b/locust/test/test_dispatch.py index e845067c20..e6709b52fc 100644 --- a/locust/test/test_dispatch.py +++ b/locust/test/test_dispatch.py @@ -2385,7 +2385,7 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2) - users_dispatcher.remove_worker(worker_nodes[1].id) + users_dispatcher.remove_worker(worker_nodes[1]) # Re-balance ts = time.perf_counter() @@ -2447,8 +2447,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2) - users_dispatcher.remove_worker(worker_nodes[1].id) - users_dispatcher.remove_worker(worker_nodes[2].id) + users_dispatcher.remove_worker(worker_nodes[1]) + users_dispatcher.remove_worker(worker_nodes[2]) # Re-balance ts = time.perf_counter() @@ -2489,7 +2489,7 @@ class User3(User): list(users_dispatcher) - users_dispatcher.remove_worker(worker_nodes[1].id) + users_dispatcher.remove_worker(worker_nodes[1]) users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3) @@ -2554,8 +2554,8 @@ class User3(User): list(users_dispatcher) - users_dispatcher.remove_worker(worker_nodes[1].id) - users_dispatcher.remove_worker(worker_nodes[2].id) + users_dispatcher.remove_worker(worker_nodes[1]) + users_dispatcher.remove_worker(worker_nodes[2]) users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3) @@ -2639,7 +2639,7 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4) - users_dispatcher.remove_worker(worker_nodes[1].id) + users_dispatcher.remove_worker(worker_nodes[1]) # Re-balance ts = time.perf_counter() @@ -2705,8 +2705,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4) - users_dispatcher.remove_worker(worker_nodes[1].id) - users_dispatcher.remove_worker(worker_nodes[2].id) + users_dispatcher.remove_worker(worker_nodes[1]) + users_dispatcher.remove_worker(worker_nodes[2]) # Re-balance ts = time.perf_counter() From ff687c460a75278859ef43a055a336b85b8ca77a Mon Sep 17 00:00:00 2001 From: Maxence Boutet Date: Wed, 28 Jul 2021 15:13:15 -0400 Subject: [PATCH 10/10] Add test for removing last workers in users dispatcher Also add more assertions for `UsersDispatcher._rebalance` in dispatch tests. --- locust/test/test_dispatch.py | 115 +++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/locust/test/test_dispatch.py b/locust/test/test_dispatch.py index e6709b52fc..ffa2b291cd 100644 --- a/locust/test/test_dispatch.py +++ b/locust/test/test_dispatch.py @@ -2385,8 +2385,12 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.remove_worker(worker_nodes[1]) + self.assertTrue(users_dispatcher._rebalance) + # Re-balance ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2398,6 +2402,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 3 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2447,9 +2453,13 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.remove_worker(worker_nodes[1]) users_dispatcher.remove_worker(worker_nodes[2]) + self.assertTrue(users_dispatcher._rebalance) + # Re-balance ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2460,6 +2470,8 @@ class User3(User): self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2}) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 6) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 3 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2489,10 +2501,16 @@ class User3(User): list(users_dispatcher) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.remove_worker(worker_nodes[1]) + self.assertTrue(users_dispatcher._rebalance) + users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3) + self.assertTrue(users_dispatcher._rebalance) + sleep_time = 1 # Re-balance @@ -2506,6 +2524,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 5) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 1 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2554,11 +2574,17 @@ class User3(User): list(users_dispatcher) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.remove_worker(worker_nodes[1]) users_dispatcher.remove_worker(worker_nodes[2]) + self.assertTrue(users_dispatcher._rebalance) + users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3) + self.assertTrue(users_dispatcher._rebalance) + sleep_time = 1 # Re-balance @@ -2571,6 +2597,8 @@ class User3(User): self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 3, "User2": 3, "User3": 3}) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 9) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 1 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2639,8 +2667,12 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.remove_worker(worker_nodes[1]) + self.assertTrue(users_dispatcher._rebalance) + # Re-balance ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2652,6 +2684,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 6) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 6) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 3 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2705,9 +2739,13 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.remove_worker(worker_nodes[1]) users_dispatcher.remove_worker(worker_nodes[2]) + self.assertTrue(users_dispatcher._rebalance) + # Re-balance ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2718,6 +2756,8 @@ class User3(User): self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 4, "User2": 4, "User3": 4}) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 12) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 3 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2726,6 +2766,41 @@ class User3(User): self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 3, "User2": 3, "User3": 3}) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 9) + def test_remove_last_worker(self): + class User1(User): + weight = 1 + + class User2(User): + weight = 1 + + class User3(User): + weight = 1 + + user_classes = [User1, User2, User3] + + worker_nodes = [WorkerNode(str(i + 1)) for i in range(1)] + + users_dispatcher = UsersDispatcher(worker_nodes=worker_nodes, user_classes=user_classes) + + users_dispatcher.new_dispatch(target_user_count=9, spawn_rate=3) + users_dispatcher._wait_between_dispatch = 0 + + # Dispatch iteration 1 + dispatched_users = next(users_dispatcher) + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 1, "User2": 1, "User3": 1}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3) + + # Dispatch iteration 2 + dispatched_users = next(users_dispatcher) + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 6) + + self.assertFalse(users_dispatcher._rebalance) + + users_dispatcher.remove_worker(worker_nodes[0]) + + self.assertFalse(users_dispatcher._rebalance) + class TestAddWorker(unittest.TestCase): def test_add_worker_during_ramp_up(self): @@ -2766,8 +2841,12 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.add_worker(worker_nodes[1]) + self.assertTrue(users_dispatcher._rebalance) + # Re-balance ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2780,6 +2859,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 3 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2826,9 +2907,13 @@ class User3(User): self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2}) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 6) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.add_worker(worker_nodes[1]) users_dispatcher.add_worker(worker_nodes[2]) + self.assertTrue(users_dispatcher._rebalance) + # Re-balance ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2841,6 +2926,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 3 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2872,10 +2959,16 @@ class User3(User): list(users_dispatcher) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.add_worker(worker_nodes[1]) + self.assertTrue(users_dispatcher._rebalance) + users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3) + self.assertTrue(users_dispatcher._rebalance) + sleep_time = 1 # Re-balance @@ -2890,6 +2983,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 1 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -2941,11 +3036,17 @@ class User3(User): list(users_dispatcher) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.add_worker(worker_nodes[1]) users_dispatcher.add_worker(worker_nodes[2]) + self.assertTrue(users_dispatcher._rebalance) + users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3) + self.assertTrue(users_dispatcher._rebalance) + sleep_time = 1 # Re-balance @@ -2960,6 +3061,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 1 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -3032,8 +3135,12 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 6) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 6) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.add_worker(worker_nodes[1]) + self.assertTrue(users_dispatcher._rebalance) + # Re-balance ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -3046,6 +3153,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 3 ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -3096,9 +3205,13 @@ class User3(User): self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 4, "User2": 4, "User3": 4}) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 12) + self.assertFalse(users_dispatcher._rebalance) + users_dispatcher.add_worker(worker_nodes[1]) users_dispatcher.add_worker(worker_nodes[2]) + self.assertTrue(users_dispatcher._rebalance) + # Re-balance ts = time.perf_counter() dispatched_users = next(users_dispatcher) @@ -3111,6 +3224,8 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4) + self.assertFalse(users_dispatcher._rebalance) + # Dispatch iteration 3 ts = time.perf_counter() dispatched_users = next(users_dispatcher)